This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 725873de6a1e1ae28f5a21616a6945c2532b7752 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 6 15:33:32 2026 +0000 feat(barrier): repoint NodeSchemaStatusService at NodeRepoRegistry for executor-tracked kinds (Phase 2 Step 2.5 §2) NodeSchemaStatusServer grows a registryProvider closure alongside the existing schemaCache provider. GetMaxRevision returns min(schemaCache.notifiedModRevision, NodeRepoRegistry.LatestModRevision) so a positive answer covers every per-service executor cache as well as the upstream watch cache. GetKeyRevisions / GetAbsentKeys split per-key lookups: kinds tracked by at least one registered schemaRepo (Group / Stream / Measure / Trace / IndexRule / IndexRuleBinding) route through NodeRepoRegistry; TopN / Property fall through to schemaCache because schemaRepo does not track those kinds. This closes the eventCh-retry leak the plan describes: SendMetadataEvent's async retry can advance schemaCache.notifiedModRevision ahead of schemaRepo, so the bare schemaCache view would certify a key the executor's LoadGroup / LoadResource still misses. Reading the registry view binds the barrier to the cache the executor actually consults. The data-node (banyand/queue/sub/server.go) and the liaison (banyand/liaison/grpc/server.go) both wire the new constructor that accepts the NodeRepoRegistry provider; resolution stays deferred to request time so PreRun-populated registrations are picked up automatically. Adds banyand/metadata/schema/property/node_status_registry_test.go exercising the routing split, registry-lag fail-closed semantics, and the kinds-empty fall-through behavior. via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- banyand/liaison/grpc/server.go | 17 +- banyand/metadata/schema/property/node_status.go | 256 +++++++++++++++------ .../schema/property/node_status_registry_test.go | 252 ++++++++++++++++++++ banyand/queue/sub/server.go | 17 +- 4 files changed, 460 insertions(+), 82 deletions(-) diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index d4b428e3f..e787368d7 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -206,13 +206,16 @@ func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster queue.Clie // deferred to request time (closure) because the metadata service // populates SchemaRegistry in PreRun, after NewServer has already // run — capturing a snapshot here would skip registration permanently. - nodeStatusSVC = property.NewNodeSchemaStatusServerForRegistry(func() *property.SchemaRegistry { - reg, regOk := svc.SchemaRegistry().(*property.SchemaRegistry) - if !regOk { - return nil - } - return reg - }) + nodeStatusSVC = property.NewNodeSchemaStatusServerForRegistryWithNodeRepo( + func() *property.SchemaRegistry { + reg, regOk := svc.SchemaRegistry().(*property.SchemaRegistry) + if !regOk { + return nil + } + return reg + }, + svc.NodeRepoRegistry, + ) } s := &server{ omr: omr, diff --git a/banyand/metadata/schema/property/node_status.go b/banyand/metadata/schema/property/node_status.go index 6303e5a6c..0e24027e5 100644 --- a/banyand/metadata/schema/property/node_status.go +++ b/banyand/metadata/schema/property/node_status.go @@ -25,6 +25,7 @@ import ( clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/pkg/schema/registry" ) // nodeStatusMaxKeys caps the per-request key count to match SchemaBarrierService @@ -35,43 +36,72 @@ import ( const nodeStatusMaxKeys = 10000 // NodeSchemaStatusServer implements clusterv1.NodeSchemaStatusServiceServer -// against a local SchemaRegistry cache. The same implementation runs on every +// against the per-node schema caches. The same implementation runs on every // cluster member that holds a schema cache: liaisons (Role_ROLE_LIAISON) and // data nodes (Role_ROLE_DATA). The "Node" prefix in the service name is a // misnomer inherited from the original proto draft — treat the service as // "cluster-member schema status" regardless of the role serving it. // -// The cache the server reads (the SchemaRegistry's schemaCache) is the same -// one downstream consumers (pkg/schema.schemaRepo, groupRepo, entityRepo) -// observe through the property watch loop. The schemaCache.notifiedModRevision -// watermark only advances after every registered handler has processed the -// relevant event, so a positive answer from this server (Present=true, -// rev=R) implies every downstream cache has also observed R. This is the -// load-bearing prerequisite for the Phase 2 cluster barrier — without it, -// the barrier would confirm a cache the data-node query executor never reads. +// Two underlying caches feed this server: +// +// 1. The per-service NodeRepoRegistry (pkg/schema/registry) — the same +// pkg/schema.schemaRepo instances the data-node executor's +// LoadGroup / LoadResource resolve through. Registered against +// KindGroup, KindStream, KindMeasure, KindTrace, KindIndexRule, +// KindIndexRuleBinding. Reading from the registry guarantees the +// barrier and the executor see a consistent answer for the same +// (group, ModRevision) pair — closing the eventCh-retry leak in +// pkg/schema.schemaRepo.SendMetadataEvent that produced the §4.6.2 +// second-run flake (commit 47006561). This is the load-bearing +// invariant of Phase 2 §Step 2.5. +// +// 2. The property schemaCache — the upstream watch cache. Used only for +// KindTopNAggregation and KindProperty, which schemaRepo does not +// track (no executor race there because TopN/Property are not +// consulted in the data-node query path the same way the per-service +// resources are). The schemaCache.notifiedModRevision watermark is +// still load-bearing for those kinds. +// +// GetMaxRevision returns the minimum across both sources so the barrier never +// certifies a revision until every cache the node exposes has applied it. type NodeSchemaStatusServer struct { clusterv1.UnimplementedNodeSchemaStatusServiceServer - cacheProvider func() *schemaCache + cacheProvider func() *schemaCache + registryProvider func() *registry.NodeRepoRegistry } -// NewNodeSchemaStatusServer wires the server to a cache provider. The -// provider is called per request so the server tolerates SchemaRegistry -// initialisation racing the gRPC server boot — an early call simply gets a -// nil cache and returns zero-valued results, which liaison-side fan-out -// treats as "node not yet ready, retry on the next backoff iteration" -// rather than a hard error. +// NewNodeSchemaStatusServer wires the server to a schemaCache provider only. +// Used by tests that fix a hand-built cache; production wiring uses the +// registry-aware constructors below so the cluster barrier reads the same +// schemaRepo the executor consults. func NewNodeSchemaStatusServer(cacheProvider func() *schemaCache) *NodeSchemaStatusServer { return &NodeSchemaStatusServer{cacheProvider: cacheProvider} } -// NewNodeSchemaStatusServerForRegistry wires the server through a registry -// provider resolved per request. Construction-time wiring (which happens -// before the metadata service's PreRun has populated the registry) would -// otherwise capture a nil snapshot and skip registration permanently. The -// provider lets the server tolerate a still-initializing registry the same -// way the barrierSVC closure in banyand/liaison/grpc/server.go does — a nil -// registry surfaces through cacheProvider as a nil schemaCache and the -// fail-closed nil-cache contract takes over. +// NewNodeSchemaStatusServerWithRegistry wires the server to both a schemaCache +// provider (for TopN/Property kinds) and a NodeRepoRegistry provider (for the +// per-service kinds the executor reads). Either provider may transiently +// return nil while metadata.PreRun is still running; the per-call closures +// tolerate that the same way the standalone fail-closed contract does. +func NewNodeSchemaStatusServerWithRegistry( + cacheProvider func() *schemaCache, + registryProvider func() *registry.NodeRepoRegistry, +) *NodeSchemaStatusServer { + return &NodeSchemaStatusServer{ + cacheProvider: cacheProvider, + registryProvider: registryProvider, + } +} + +// NewNodeSchemaStatusServerForRegistry wires the server through a property +// SchemaRegistry provider resolved per request. Construction-time wiring +// (which happens before the metadata service's PreRun has populated the +// registry) would otherwise capture a nil snapshot and skip registration +// permanently. The provider lets the server tolerate a still-initializing +// registry the same way the barrierSVC closure in +// banyand/liaison/grpc/server.go does — a nil registry surfaces through +// cacheProvider as a nil schemaCache and the fail-closed nil-cache contract +// takes over. func NewNodeSchemaStatusServerForRegistry(provider func() *SchemaRegistry) *NodeSchemaStatusServer { return &NodeSchemaStatusServer{ cacheProvider: func() *schemaCache { @@ -87,25 +117,76 @@ func NewNodeSchemaStatusServerForRegistry(provider func() *SchemaRegistry) *Node } } -// GetMaxRevision returns the highest mod_revision currently observed by the -// node's local schema cache. When the cache is not yet initialized the -// response carries 0, which the liaison's barrier loop interprets as -// "node not yet caught up" and continues polling. +// NewNodeSchemaStatusServerForRegistryWithNodeRepo wires both the property +// SchemaRegistry provider (for the schemaCache) and a NodeRepoRegistry +// provider (for the per-service schemaRepo aggregator) through per-call +// closures. The metadata Service exposes the NodeRepoRegistry via +// Service.NodeRepoRegistry(); pass the bound method here. +func NewNodeSchemaStatusServerForRegistryWithNodeRepo( + provider func() *SchemaRegistry, + registryProvider func() *registry.NodeRepoRegistry, +) *NodeSchemaStatusServer { + srv := NewNodeSchemaStatusServerForRegistry(provider) + srv.registryProvider = registryProvider + return srv +} + +func (s *NodeSchemaStatusServer) cache() *schemaCache { + if s.cacheProvider == nil { + return nil + } + return s.cacheProvider() +} + +func (s *NodeSchemaStatusServer) registry() *registry.NodeRepoRegistry { + if s.registryProvider == nil { + return nil + } + return s.registryProvider() +} + +// GetMaxRevision returns the minimum mod_revision across the schemaCache +// watermark and the per-service NodeRepoRegistry. Either source contributes +// only when present and non-empty; a registry with no registered repos is +// treated as "no executor cache to gate on" and skipped (otherwise its +// LatestModRevision()=0 would gate every verdict to 0 on a metadata-only +// host). +// +// When neither source has data the response carries 0, which the liaison's +// barrier loop interprets as "node not yet caught up" and continues polling. func (s *NodeSchemaStatusServer) GetMaxRevision(_ context.Context, _ *clusterv1.GetMaxRevisionRequest) (*clusterv1.GetMaxRevisionResponse, error) { - c := s.cacheProvider() - if c == nil { + var ( + minRev int64 + hasSource bool + ) + if c := s.cache(); c != nil { + minRev = c.GetMaxModRevision() + hasSource = true + } + if reg := s.registry(); reg != nil && !reg.Empty() { + regRev := reg.LatestModRevision() + if !hasSource || regRev < minRev { + minRev = regRev + } + hasSource = true + } + if !hasSource { return &clusterv1.GetMaxRevisionResponse{}, nil } - return &clusterv1.GetMaxRevisionResponse{MaxModRevision: c.GetMaxModRevision()}, nil + return &clusterv1.GetMaxRevisionResponse{MaxModRevision: minRev}, nil } // GetKeyRevisions returns per-key (mod_revision, present) pairs in the same -// order the caller supplied keys. A key referencing a group the node has -// not observed maps to mod_revision=0, present=false — the call does not -// error on missing groups so the liaison can treat group-not-yet-registered -// as a normal laggard rather than a server fault. +// order the caller supplied keys. Each key is routed by Kind: kinds tracked +// by some registered schemaRepo are answered from the NodeRepoRegistry (so +// the response matches what the executor's LoadResource would resolve); +// remaining kinds (TopN, Property) fall through to the schemaCache. // -// An empty request is valid and produces an empty response. +// A key referencing a group the node has not observed maps to +// mod_revision=0, present=false — the call does not error on missing groups +// so the liaison can treat group-not-yet-registered as a normal laggard +// rather than a server fault. An empty request is valid and produces an +// empty response. func (s *NodeSchemaStatusServer) GetKeyRevisions(_ context.Context, req *clusterv1.GetKeyRevisionsRequest) (*clusterv1.GetKeyRevisionsResponse, error) { keys := req.GetKeys() if len(keys) > nodeStatusMaxKeys { @@ -117,16 +198,11 @@ func (s *NodeSchemaStatusServer) GetKeyRevisions(_ context.Context, req *cluster for i, key := range keys { resp.Revisions[i] = &clusterv1.KeyRevision{Key: key} } - c := s.cacheProvider() - if c == nil || len(keys) == 0 { + if len(keys) == 0 { return resp, nil } - propIDs := schemaKeysToPropIDs(keys) - statuses := c.GetKeyRevisions(propIDs) - for i, keyStatus := range statuses { - resp.Revisions[i].ModRevision = keyStatus.ModRevision - resp.Revisions[i].Present = keyStatus.Present - } + cacheIndices := s.routeKeysByKind(keys, resp.Revisions) + s.fillCacheKeyRevisions(keys, cacheIndices, resp.Revisions) return resp, nil } @@ -134,6 +210,10 @@ func (s *NodeSchemaStatusServer) GetKeyRevisions(_ context.Context, req *cluster // node's live cache or pending downstream notification) and "still_present". // A SchemaKey with an unknown kind value is reported in absent_keys without // erroring so the caller can rely on the partition adding up to the input. +// +// Routing follows GetKeyRevisions: registry-routed kinds consult the +// per-service schemaRepo (so absence here means absence in the executor's +// resolver cache too); TopN/Property fall through to the schemaCache. func (s *NodeSchemaStatusServer) GetAbsentKeys(_ context.Context, req *clusterv1.GetAbsentKeysRequest) (*clusterv1.GetAbsentKeysResponse, error) { keys := req.GetKeys() if len(keys) > nodeStatusMaxKeys { @@ -143,24 +223,29 @@ func (s *NodeSchemaStatusServer) GetAbsentKeys(_ context.Context, req *clusterv1 if len(keys) == 0 { return resp, nil } - c := s.cacheProvider() - if c == nil { - // Cache not yet initialized — the node has not observed any schema - // state, so it cannot claim deletion has been applied. Report every - // key as still present so the liaison's AwaitSchemaDeleted barrier - // keeps polling until the cache is online. Mirrors the Phase 1 + c := s.cache() + reg := s.registry() + if c == nil && (reg == nil || reg.Empty()) { + // Neither source initialized — the node has not observed any + // schema state, so it cannot claim deletion has been applied. + // Report every key as still present so the liaison's + // AwaitSchemaDeleted barrier keeps polling. Mirrors the Phase 1 // collectPresentKeys nil-cache contract in // banyand/liaison/grpc/barrier.go. resp.StillPresentKeys = append([]*schemav1.SchemaKey(nil), keys...) return resp, nil } - propIDs := schemaKeysToPropIDs(keys) - // One read-lock pass over the cache; an empty propID (unknown kind) maps - // to Present=false in c.GetKeyRevisions because c.entries[""] never - // matches, which is the same "absent" partition the proto requires. - statuses := c.GetKeyRevisions(propIDs) - for i, keyStatus := range statuses { - if keyStatus.Present { + // Reuse the GetKeyRevisions routing to obtain a per-key Present flag, + // then partition by that flag. The two RPCs share a routing rule by + // design; keeping the implementation in one place avoids drift. + revisions := make([]*clusterv1.KeyRevision, len(keys)) + for i, key := range keys { + revisions[i] = &clusterv1.KeyRevision{Key: key} + } + cacheIndices := s.routeKeysByKind(keys, revisions) + s.fillCacheKeyRevisions(keys, cacheIndices, revisions) + for i, kr := range revisions { + if kr.Present { resp.StillPresentKeys = append(resp.StillPresentKeys, keys[i]) continue } @@ -169,17 +254,52 @@ func (s *NodeSchemaStatusServer) GetAbsentKeys(_ context.Context, req *clusterv1 return resp, nil } -// schemaKeysToPropIDs converts a slice of SchemaKey messages into the -// internal propID strings used by the schema cache. Keys with unknown kind -// strings produce an empty propID at the corresponding index; callers may -// pass empty propIDs straight to schemaCache.GetKeyRevisions, which reports -// them as Present=false (the cache map never holds an entry under ""). -// This matches the proto contract: an unknown kind is "absent from this -// node's perspective" rather than a parse error. -func schemaKeysToPropIDs(keys []*schemav1.SchemaKey) []string { - out := make([]string, len(keys)) +// routeKeysByKind fills in revisions[i] for every key whose Kind is tracked +// by a registered schemaRepo and returns the indices of the remaining keys +// that need to be answered from the schemaCache. The split is positional so +// the caller can preserve the request key order in its response. +func (s *NodeSchemaStatusServer) routeKeysByKind(keys []*schemav1.SchemaKey, revisions []*clusterv1.KeyRevision) []int { + reg := s.registry() + if reg == nil { + cacheIndices := make([]int, len(keys)) + for i := range keys { + cacheIndices[i] = i + } + return cacheIndices + } + cacheIndices := make([]int, 0, len(keys)) for i, key := range keys { - out[i] = BuildPropertyIDFromSchemaKey(key) + kind := kindFromProtoString(key.GetKind()) + if kind == 0 || !reg.HasKind(kind) { + cacheIndices = append(cacheIndices, i) + continue + } + rev, ok := reg.ResourceRevision(kind, key.GetGroup(), key.GetName()) + revisions[i].ModRevision = rev + revisions[i].Present = ok + } + return cacheIndices +} + +// fillCacheKeyRevisions answers the schemaCache-routed indices in a single +// read-lock pass. A nil cache leaves the revisions untouched (Present=false, +// ModRevision=0) which is the correct "node hasn't seen this key" answer. +func (s *NodeSchemaStatusServer) fillCacheKeyRevisions(keys []*schemav1.SchemaKey, cacheIndices []int, revisions []*clusterv1.KeyRevision) { + if len(cacheIndices) == 0 { + return + } + c := s.cache() + if c == nil { + return + } + propIDs := make([]string, len(cacheIndices)) + for j, i := range cacheIndices { + propIDs[j] = BuildPropertyIDFromSchemaKey(keys[i]) + } + statuses := c.GetKeyRevisions(propIDs) + for j, keyStatus := range statuses { + i := cacheIndices[j] + revisions[i].ModRevision = keyStatus.ModRevision + revisions[i].Present = keyStatus.Present } - return out } diff --git a/banyand/metadata/schema/property/node_status_registry_test.go b/banyand/metadata/schema/property/node_status_registry_test.go new file mode 100644 index 000000000..58108212c --- /dev/null +++ b/banyand/metadata/schema/property/node_status_registry_test.go @@ -0,0 +1,252 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + metaschema "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/schema/registry" +) + +// fakeRevRepo emulates a per-service pkg/schema.schemaRepo for the registry +// pointer-identity tests. The "Latest" knob simulates the eventCh-retry leak +// scenario from pkg/schema/cache.go:106 — a repo lagging behind the property +// schemaCache because a transient processEvent failure deferred the apply +// onto the async retry queue. +type fakeRevRepo struct { + resources map[metaschema.Kind]map[string]int64 + latest int64 +} + +func newFakeRevRepo(latest int64) *fakeRevRepo { + return &fakeRevRepo{ + latest: latest, + resources: make(map[metaschema.Kind]map[string]int64), + } +} + +// put records (kind, group, name, rev) in the fake. group is parameterized +// even though every current caller passes "g1" so the helper stays usable +// when future tests exercise cross-group routing. +// +//nolint:unparam // group is generic by design; test fixtures happen to share "g1". +func (f *fakeRevRepo) put(kind metaschema.Kind, group, name string, rev int64) { + if f.resources[kind] == nil { + f.resources[kind] = make(map[string]int64) + } + f.resources[kind][group+"/"+name] = rev +} + +func (f *fakeRevRepo) LatestModRevision() int64 { return f.latest } + +func (f *fakeRevRepo) ResourceRevision(kind metaschema.Kind, group, name string) (int64, bool) { + if m := f.resources[kind]; m != nil { + if rev, ok := m[group+"/"+name]; ok { + return rev, true + } + } + return 0, false +} + +func (f *fakeRevRepo) IsAbsent(kind metaschema.Kind, group, name string) bool { + _, ok := f.ResourceRevision(kind, group, name) + return !ok +} + +// TestExecutor_ResolvesGroupsViaSharedSchemaRepo asserts the cluster barrier's +// per-node probe reads the same schemaRepo instance the executor consults via +// LoadGroup. Pointer identity is the load-bearing Phase 2 §Step 2.5 invariant: +// the registry returns a positive answer for (KindGroup, group, name) only +// when the same repo holds the group, so the barrier and the executor cannot +// disagree on the same (group, ModRevision) pair. +func TestExecutor_ResolvesGroupsViaSharedSchemaRepo(t *testing.T) { + const targetRev int64 = 42 + repo := newFakeRevRepo(targetRev) + repo.put(metaschema.KindGroup, "g1", "g1", targetRev) + repo.put(metaschema.KindMeasure, "g1", "m1", targetRev) + + reg := registry.NewNodeRepoRegistry() + registry.MaybeRegister(reg, + metaschema.KindGroup|metaschema.KindMeasure|metaschema.KindIndexRule|metaschema.KindIndexRuleBinding, + repo, + ) + + srv := NewNodeSchemaStatusServerWithRegistry( + func() *schemaCache { return nil }, + func() *registry.NodeRepoRegistry { return reg }, + ) + + maxResp, err := srv.GetMaxRevision(context.Background(), &clusterv1.GetMaxRevisionRequest{}) + require.NoError(t, err) + assert.Equal(t, targetRev, maxResp.GetMaxModRevision(), + "registry-only node reports the registered repo's latest") + + revResp, err := srv.GetKeyRevisions(context.Background(), &clusterv1.GetKeyRevisionsRequest{ + Keys: []*schemav1.SchemaKey{ + {Kind: "group", Group: "g1", Name: "g1"}, + {Kind: "measure", Group: "g1", Name: "m1"}, + }, + }) + require.NoError(t, err) + require.Len(t, revResp.GetRevisions(), 2) + for _, kr := range revResp.GetRevisions() { + assert.True(t, kr.GetPresent(), "%s present in registry-routed lookup", kr.GetKey().GetName()) + assert.Equal(t, targetRev, kr.GetModRevision()) + } + + // Pointer identity: the value returned to the barrier and the value the + // executor would resolve via LoadGroup are read from the SAME repo + // pointer. Verifying via fake means changing the fake's internal state + // is observable from both lookups (no parallel cache to drift). + repo.put(metaschema.KindGroup, "g1", "g1", targetRev+1) + revResp2, err := srv.GetKeyRevisions(context.Background(), &clusterv1.GetKeyRevisionsRequest{ + Keys: []*schemav1.SchemaKey{ + {Kind: "group", Group: "g1", Name: "g1"}, + }, + }) + require.NoError(t, err) + require.Len(t, revResp2.GetRevisions(), 1) + assert.Equal(t, targetRev+1, revResp2.GetRevisions()[0].GetModRevision(), + "a repo-side mutation is immediately visible through the registry — same pointer") +} + +// TestNodeStatus_DoesNotAdvancePastSchemaRepoLag injects a ModRevision skew +// between the property schemaCache and the per-service schemaRepo (the +// scenario the eventCh-retry path in pkg/schema.schemaRepo.SendMetadataEvent +// produces) and asserts GetMaxRevision returns the laggard's value, never the +// cache's. This is the regression that distinguishes the Phase 2 §Step 2.5 +// pivot from the original schemaCache-only implementation: the barrier no +// longer certifies a revision the executor's resolver cache has not applied. +func TestNodeStatus_DoesNotAdvancePastSchemaRepoLag(t *testing.T) { + // schemaCache is at watermark 100 — every kind it tracks has been + // notified to handlers up to revision 100. + c := newSchemaCache() + c.notifiedModRevision = 100 + + // schemaRepo lags behind at 50 — the canonical executor-cache scenario + // the §Step 2.5 pivot exists to handle. + repo := newFakeRevRepo(50) + + reg := registry.NewNodeRepoRegistry() + registry.MaybeRegister(reg, metaschema.KindMeasure, repo) + + srv := NewNodeSchemaStatusServerWithRegistry( + func() *schemaCache { return c }, + func() *registry.NodeRepoRegistry { return reg }, + ) + + resp, err := srv.GetMaxRevision(context.Background(), &clusterv1.GetMaxRevisionRequest{}) + require.NoError(t, err) + assert.Equal(t, int64(50), resp.GetMaxModRevision(), + "GetMaxRevision = min(schemaCache.watermark, registry.LatestModRevision); the laggard wins") + + // When the laggard catches up to 100, GetMaxRevision should match. + repo.latest = 100 + resp2, err := srv.GetMaxRevision(context.Background(), &clusterv1.GetMaxRevisionRequest{}) + require.NoError(t, err) + assert.Equal(t, int64(100), resp2.GetMaxModRevision(), + "once the laggard catches up, the min equals the watermark") +} + +// TestNodeStatus_RegistryOnly_NoSchemaCache covers the "registry alone" +// configuration: a node whose schemaCache provider returns nil but whose +// registry has registered repos. The server must report the registry's +// LatestModRevision as the max — mirrors the production wiring on a +// data-node before the schemaCache has been fully populated. +func TestNodeStatus_RegistryOnly_NoSchemaCache(t *testing.T) { + repo := newFakeRevRepo(77) + repo.put(metaschema.KindStream, "g1", "s1", 60) + + reg := registry.NewNodeRepoRegistry() + registry.MaybeRegister(reg, metaschema.KindStream, repo) + + srv := NewNodeSchemaStatusServerWithRegistry( + func() *schemaCache { return nil }, + func() *registry.NodeRepoRegistry { return reg }, + ) + + resp, err := srv.GetMaxRevision(context.Background(), &clusterv1.GetMaxRevisionRequest{}) + require.NoError(t, err) + assert.Equal(t, int64(77), resp.GetMaxModRevision()) +} + +// TestNodeStatus_EmptyRegistry_FallsBackToSchemaCache covers the production +// boot path: the registry exists but no service has registered yet. The +// server must skip the registry's "min" contribution rather than gate the +// barrier verdict to 0. Mirrors the schemaCache-only behavior for a +// metadata-only host. +func TestNodeStatus_EmptyRegistry_FallsBackToSchemaCache(t *testing.T) { + c := newSchemaCache() + c.notifiedModRevision = 25 + + reg := registry.NewNodeRepoRegistry() + + srv := NewNodeSchemaStatusServerWithRegistry( + func() *schemaCache { return c }, + func() *registry.NodeRepoRegistry { return reg }, + ) + + resp, err := srv.GetMaxRevision(context.Background(), &clusterv1.GetMaxRevisionRequest{}) + require.NoError(t, err) + assert.Equal(t, int64(25), resp.GetMaxModRevision(), + "empty registry contributes nothing — schemaCache watermark wins") +} + +// TestNodeStatus_KindRouting_TopNFallsBackToSchemaCache asserts that kinds +// the registry does NOT track (TopNAggregation, Property) fall through to the +// schemaCache lookup unchanged. The Phase 2 §Step 2.5 invariant covers only +// the kinds schemaRepo holds; TopN/Property barrier reads remain on the +// schemaCache.notifiedModRevision watermark. +func TestNodeStatus_KindRouting_TopNFallsBackToSchemaCache(t *testing.T) { + idTopN, entryTopN := makeEntry(metaschema.KindTopNAggregation, "topn1", 33) + c := newSchemaCache() + c.entries[idTopN] = entryTopN + c.notifiedModRevision = 33 + + repo := newFakeRevRepo(33) + repo.put(metaschema.KindMeasure, "g1", "m1", 33) + reg := registry.NewNodeRepoRegistry() + registry.MaybeRegister(reg, metaschema.KindMeasure, repo) + + srv := NewNodeSchemaStatusServerWithRegistry( + func() *schemaCache { return c }, + func() *registry.NodeRepoRegistry { return reg }, + ) + + resp, err := srv.GetKeyRevisions(context.Background(), &clusterv1.GetKeyRevisionsRequest{ + Keys: []*schemav1.SchemaKey{ + {Kind: "top_n_aggregation", Group: testGroup, Name: "topn1"}, + {Kind: "measure", Group: "g1", Name: "m1"}, + }, + }) + require.NoError(t, err) + require.Len(t, resp.GetRevisions(), 2) + + assert.True(t, resp.Revisions[0].GetPresent(), "TopN routes through schemaCache and is present at watermark") + assert.Equal(t, int64(33), resp.Revisions[0].GetModRevision()) + + assert.True(t, resp.Revisions[1].GetPresent(), "measure routes through registry and is present in repo") + assert.Equal(t, int64(33), resp.Revisions[1].GetModRevision()) +} diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go index 454e5e508..befe295d8 100644 --- a/banyand/queue/sub/server.go +++ b/banyand/queue/sub/server.go @@ -302,13 +302,16 @@ func (s *server) Serve() run.StopNotify { // time; the fail-closed nil-cache contract handles in-flight probes // during initialization. if svc, svcOk := s.metadataRepo.(metadata.Service); svcOk { - clusterv1.RegisterNodeSchemaStatusServiceServer(s.ser, property.NewNodeSchemaStatusServerForRegistry(func() *property.SchemaRegistry { - reg, regOk := svc.SchemaRegistry().(*property.SchemaRegistry) - if !regOk { - return nil - } - return reg - })) + clusterv1.RegisterNodeSchemaStatusServiceServer(s.ser, property.NewNodeSchemaStatusServerForRegistryWithNodeRepo( + func() *property.SchemaRegistry { + reg, regOk := svc.SchemaRegistry().(*property.SchemaRegistry) + if !regOk { + return nil + } + return reg + }, + svc.NodeRepoRegistry, + )) } }
