This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 725873de6a1e1ae28f5a21616a6945c2532b7752
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 6 15:33:32 2026 +0000

    feat(barrier): repoint NodeSchemaStatusService at NodeRepoRegistry for 
executor-tracked kinds (Phase 2 Step 2.5 §2)
    
    NodeSchemaStatusServer grows a registryProvider closure alongside the
    existing schemaCache provider. GetMaxRevision returns
    min(schemaCache.notifiedModRevision, NodeRepoRegistry.LatestModRevision)
    so a positive answer covers every per-service executor cache as well as
    the upstream watch cache. GetKeyRevisions / GetAbsentKeys split per-key
    lookups: kinds tracked by at least one registered schemaRepo
    (Group / Stream / Measure / Trace / IndexRule / IndexRuleBinding) route
    through NodeRepoRegistry; TopN / Property fall through to schemaCache
    because schemaRepo does not track those kinds.
    
    This closes the eventCh-retry leak the plan describes:
    SendMetadataEvent's async retry can advance schemaCache.notifiedModRevision
    ahead of schemaRepo, so the bare schemaCache view would certify a key the
    executor's LoadGroup / LoadResource still misses. Reading the registry
    view binds the barrier to the cache the executor actually consults.
    
    The data-node (banyand/queue/sub/server.go) and the liaison
    (banyand/liaison/grpc/server.go) both wire the new constructor that
    accepts the NodeRepoRegistry provider; resolution stays deferred to
    request time so PreRun-populated registrations are picked up
    automatically.
    
    Adds banyand/metadata/schema/property/node_status_registry_test.go
    exercising the routing split, registry-lag fail-closed semantics, and
    the kinds-empty fall-through behavior.
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 banyand/liaison/grpc/server.go                     |  17 +-
 banyand/metadata/schema/property/node_status.go    | 256 +++++++++++++++------
 .../schema/property/node_status_registry_test.go   | 252 ++++++++++++++++++++
 banyand/queue/sub/server.go                        |  17 +-
 4 files changed, 460 insertions(+), 82 deletions(-)

diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index d4b428e3f..e787368d7 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -206,13 +206,16 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
                // deferred to request time (closure) because the metadata 
service
                // populates SchemaRegistry in PreRun, after NewServer has 
already
                // run — capturing a snapshot here would skip registration 
permanently.
-               nodeStatusSVC = 
property.NewNodeSchemaStatusServerForRegistry(func() *property.SchemaRegistry {
-                       reg, regOk := 
svc.SchemaRegistry().(*property.SchemaRegistry)
-                       if !regOk {
-                               return nil
-                       }
-                       return reg
-               })
+               nodeStatusSVC = 
property.NewNodeSchemaStatusServerForRegistryWithNodeRepo(
+                       func() *property.SchemaRegistry {
+                               reg, regOk := 
svc.SchemaRegistry().(*property.SchemaRegistry)
+                               if !regOk {
+                                       return nil
+                               }
+                               return reg
+                       },
+                       svc.NodeRepoRegistry,
+               )
        }
        s := &server{
                omr:           omr,
diff --git a/banyand/metadata/schema/property/node_status.go 
b/banyand/metadata/schema/property/node_status.go
index 6303e5a6c..0e24027e5 100644
--- a/banyand/metadata/schema/property/node_status.go
+++ b/banyand/metadata/schema/property/node_status.go
@@ -25,6 +25,7 @@ import (
 
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/pkg/schema/registry"
 )
 
 // nodeStatusMaxKeys caps the per-request key count to match 
SchemaBarrierService
@@ -35,43 +36,72 @@ import (
 const nodeStatusMaxKeys = 10000
 
 // NodeSchemaStatusServer implements clusterv1.NodeSchemaStatusServiceServer
-// against a local SchemaRegistry cache. The same implementation runs on every
+// against the per-node schema caches. The same implementation runs on every
 // cluster member that holds a schema cache: liaisons (Role_ROLE_LIAISON) and
 // data nodes (Role_ROLE_DATA). The "Node" prefix in the service name is a
 // misnomer inherited from the original proto draft — treat the service as
 // "cluster-member schema status" regardless of the role serving it.
 //
-// The cache the server reads (the SchemaRegistry's schemaCache) is the same
-// one downstream consumers (pkg/schema.schemaRepo, groupRepo, entityRepo)
-// observe through the property watch loop. The schemaCache.notifiedModRevision
-// watermark only advances after every registered handler has processed the
-// relevant event, so a positive answer from this server (Present=true,
-// rev=R) implies every downstream cache has also observed R. This is the
-// load-bearing prerequisite for the Phase 2 cluster barrier — without it,
-// the barrier would confirm a cache the data-node query executor never reads.
+// Two underlying caches feed this server:
+//
+//  1. The per-service NodeRepoRegistry (pkg/schema/registry) — the same
+//     pkg/schema.schemaRepo instances the data-node executor's
+//     LoadGroup / LoadResource resolve through. Registered against
+//     KindGroup, KindStream, KindMeasure, KindTrace, KindIndexRule,
+//     KindIndexRuleBinding. Reading from the registry guarantees the
+//     barrier and the executor see a consistent answer for the same
+//     (group, ModRevision) pair — closing the eventCh-retry leak in
+//     pkg/schema.schemaRepo.SendMetadataEvent that produced the §4.6.2
+//     second-run flake (commit 47006561). This is the load-bearing
+//     invariant of Phase 2 §Step 2.5.
+//
+//  2. The property schemaCache — the upstream watch cache. Used only for
+//     KindTopNAggregation and KindProperty, which schemaRepo does not
+//     track (no executor race there because TopN/Property are not
+//     consulted in the data-node query path the same way the per-service
+//     resources are). The schemaCache.notifiedModRevision watermark is
+//     still load-bearing for those kinds.
+//
+// GetMaxRevision returns the minimum across both sources so the barrier never
+// certifies a revision until every cache the node exposes has applied it.
 type NodeSchemaStatusServer struct {
        clusterv1.UnimplementedNodeSchemaStatusServiceServer
-       cacheProvider func() *schemaCache
+       cacheProvider    func() *schemaCache
+       registryProvider func() *registry.NodeRepoRegistry
 }
 
-// NewNodeSchemaStatusServer wires the server to a cache provider. The
-// provider is called per request so the server tolerates SchemaRegistry
-// initialisation racing the gRPC server boot — an early call simply gets a
-// nil cache and returns zero-valued results, which liaison-side fan-out
-// treats as "node not yet ready, retry on the next backoff iteration"
-// rather than a hard error.
+// NewNodeSchemaStatusServer wires the server to a schemaCache provider only.
+// Used by tests that fix a hand-built cache; production wiring uses the
+// registry-aware constructors below so the cluster barrier reads the same
+// schemaRepo the executor consults.
 func NewNodeSchemaStatusServer(cacheProvider func() *schemaCache) 
*NodeSchemaStatusServer {
        return &NodeSchemaStatusServer{cacheProvider: cacheProvider}
 }
 
-// NewNodeSchemaStatusServerForRegistry wires the server through a registry
-// provider resolved per request. Construction-time wiring (which happens
-// before the metadata service's PreRun has populated the registry) would
-// otherwise capture a nil snapshot and skip registration permanently. The
-// provider lets the server tolerate a still-initializing registry the same
-// way the barrierSVC closure in banyand/liaison/grpc/server.go does — a nil
-// registry surfaces through cacheProvider as a nil schemaCache and the
-// fail-closed nil-cache contract takes over.
+// NewNodeSchemaStatusServerWithRegistry wires the server to both a schemaCache
+// provider (for TopN/Property kinds) and a NodeRepoRegistry provider (for the
+// per-service kinds the executor reads). Either provider may transiently
+// return nil while metadata.PreRun is still running; the per-call closures
+// tolerate that the same way the standalone fail-closed contract does.
+func NewNodeSchemaStatusServerWithRegistry(
+       cacheProvider func() *schemaCache,
+       registryProvider func() *registry.NodeRepoRegistry,
+) *NodeSchemaStatusServer {
+       return &NodeSchemaStatusServer{
+               cacheProvider:    cacheProvider,
+               registryProvider: registryProvider,
+       }
+}
+
+// NewNodeSchemaStatusServerForRegistry wires the server through a property
+// SchemaRegistry provider resolved per request. Construction-time wiring
+// (which happens before the metadata service's PreRun has populated the
+// registry) would otherwise capture a nil snapshot and skip registration
+// permanently. The provider lets the server tolerate a still-initializing
+// registry the same way the barrierSVC closure in
+// banyand/liaison/grpc/server.go does — a nil registry surfaces through
+// cacheProvider as a nil schemaCache and the fail-closed nil-cache contract
+// takes over.
 func NewNodeSchemaStatusServerForRegistry(provider func() *SchemaRegistry) 
*NodeSchemaStatusServer {
        return &NodeSchemaStatusServer{
                cacheProvider: func() *schemaCache {
@@ -87,25 +117,76 @@ func NewNodeSchemaStatusServerForRegistry(provider func() 
*SchemaRegistry) *Node
        }
 }
 
-// GetMaxRevision returns the highest mod_revision currently observed by the
-// node's local schema cache. When the cache is not yet initialized the
-// response carries 0, which the liaison's barrier loop interprets as
-// "node not yet caught up" and continues polling.
+// NewNodeSchemaStatusServerForRegistryWithNodeRepo wires both the property
+// SchemaRegistry provider (for the schemaCache) and a NodeRepoRegistry
+// provider (for the per-service schemaRepo aggregator) through per-call
+// closures. The metadata Service exposes the NodeRepoRegistry via
+// Service.NodeRepoRegistry(); pass the bound method here.
+func NewNodeSchemaStatusServerForRegistryWithNodeRepo(
+       provider func() *SchemaRegistry,
+       registryProvider func() *registry.NodeRepoRegistry,
+) *NodeSchemaStatusServer {
+       srv := NewNodeSchemaStatusServerForRegistry(provider)
+       srv.registryProvider = registryProvider
+       return srv
+}
+
+func (s *NodeSchemaStatusServer) cache() *schemaCache {
+       if s.cacheProvider == nil {
+               return nil
+       }
+       return s.cacheProvider()
+}
+
+func (s *NodeSchemaStatusServer) registry() *registry.NodeRepoRegistry {
+       if s.registryProvider == nil {
+               return nil
+       }
+       return s.registryProvider()
+}
+
+// GetMaxRevision returns the minimum mod_revision across the schemaCache
+// watermark and the per-service NodeRepoRegistry. Either source contributes
+// only when present and non-empty; a registry with no registered repos is
+// treated as "no executor cache to gate on" and skipped (otherwise its
+// LatestModRevision()=0 would gate every verdict to 0 on a metadata-only
+// host).
+//
+// When neither source has data the response carries 0, which the liaison's
+// barrier loop interprets as "node not yet caught up" and continues polling.
 func (s *NodeSchemaStatusServer) GetMaxRevision(_ context.Context, _ 
*clusterv1.GetMaxRevisionRequest) (*clusterv1.GetMaxRevisionResponse, error) {
-       c := s.cacheProvider()
-       if c == nil {
+       var (
+               minRev    int64
+               hasSource bool
+       )
+       if c := s.cache(); c != nil {
+               minRev = c.GetMaxModRevision()
+               hasSource = true
+       }
+       if reg := s.registry(); reg != nil && !reg.Empty() {
+               regRev := reg.LatestModRevision()
+               if !hasSource || regRev < minRev {
+                       minRev = regRev
+               }
+               hasSource = true
+       }
+       if !hasSource {
                return &clusterv1.GetMaxRevisionResponse{}, nil
        }
-       return &clusterv1.GetMaxRevisionResponse{MaxModRevision: 
c.GetMaxModRevision()}, nil
+       return &clusterv1.GetMaxRevisionResponse{MaxModRevision: minRev}, nil
 }
 
 // GetKeyRevisions returns per-key (mod_revision, present) pairs in the same
-// order the caller supplied keys. A key referencing a group the node has
-// not observed maps to mod_revision=0, present=false — the call does not
-// error on missing groups so the liaison can treat group-not-yet-registered
-// as a normal laggard rather than a server fault.
+// order the caller supplied keys. Each key is routed by Kind: kinds tracked
+// by some registered schemaRepo are answered from the NodeRepoRegistry (so
+// the response matches what the executor's LoadResource would resolve);
+// remaining kinds (TopN, Property) fall through to the schemaCache.
 //
-// An empty request is valid and produces an empty response.
+// A key referencing a group the node has not observed maps to
+// mod_revision=0, present=false — the call does not error on missing groups
+// so the liaison can treat group-not-yet-registered as a normal laggard
+// rather than a server fault. An empty request is valid and produces an
+// empty response.
 func (s *NodeSchemaStatusServer) GetKeyRevisions(_ context.Context, req 
*clusterv1.GetKeyRevisionsRequest) (*clusterv1.GetKeyRevisionsResponse, error) {
        keys := req.GetKeys()
        if len(keys) > nodeStatusMaxKeys {
@@ -117,16 +198,11 @@ func (s *NodeSchemaStatusServer) GetKeyRevisions(_ 
context.Context, req *cluster
        for i, key := range keys {
                resp.Revisions[i] = &clusterv1.KeyRevision{Key: key}
        }
-       c := s.cacheProvider()
-       if c == nil || len(keys) == 0 {
+       if len(keys) == 0 {
                return resp, nil
        }
-       propIDs := schemaKeysToPropIDs(keys)
-       statuses := c.GetKeyRevisions(propIDs)
-       for i, keyStatus := range statuses {
-               resp.Revisions[i].ModRevision = keyStatus.ModRevision
-               resp.Revisions[i].Present = keyStatus.Present
-       }
+       cacheIndices := s.routeKeysByKind(keys, resp.Revisions)
+       s.fillCacheKeyRevisions(keys, cacheIndices, resp.Revisions)
        return resp, nil
 }
 
@@ -134,6 +210,10 @@ func (s *NodeSchemaStatusServer) GetKeyRevisions(_ 
context.Context, req *cluster
 // node's live cache or pending downstream notification) and "still_present".
 // A SchemaKey with an unknown kind value is reported in absent_keys without
 // erroring so the caller can rely on the partition adding up to the input.
+//
+// Routing follows GetKeyRevisions: registry-routed kinds consult the
+// per-service schemaRepo (so absence here means absence in the executor's
+// resolver cache too); TopN/Property fall through to the schemaCache.
 func (s *NodeSchemaStatusServer) GetAbsentKeys(_ context.Context, req 
*clusterv1.GetAbsentKeysRequest) (*clusterv1.GetAbsentKeysResponse, error) {
        keys := req.GetKeys()
        if len(keys) > nodeStatusMaxKeys {
@@ -143,24 +223,29 @@ func (s *NodeSchemaStatusServer) GetAbsentKeys(_ 
context.Context, req *clusterv1
        if len(keys) == 0 {
                return resp, nil
        }
-       c := s.cacheProvider()
-       if c == nil {
-               // Cache not yet initialized — the node has not observed any 
schema
-               // state, so it cannot claim deletion has been applied. Report 
every
-               // key as still present so the liaison's AwaitSchemaDeleted 
barrier
-               // keeps polling until the cache is online. Mirrors the Phase 1
+       c := s.cache()
+       reg := s.registry()
+       if c == nil && (reg == nil || reg.Empty()) {
+               // Neither source initialized — the node has not observed any
+               // schema state, so it cannot claim deletion has been applied.
+               // Report every key as still present so the liaison's
+               // AwaitSchemaDeleted barrier keeps polling. Mirrors the Phase 1
                // collectPresentKeys nil-cache contract in
                // banyand/liaison/grpc/barrier.go.
                resp.StillPresentKeys = append([]*schemav1.SchemaKey(nil), 
keys...)
                return resp, nil
        }
-       propIDs := schemaKeysToPropIDs(keys)
-       // One read-lock pass over the cache; an empty propID (unknown kind) 
maps
-       // to Present=false in c.GetKeyRevisions because c.entries[""] never
-       // matches, which is the same "absent" partition the proto requires.
-       statuses := c.GetKeyRevisions(propIDs)
-       for i, keyStatus := range statuses {
-               if keyStatus.Present {
+       // Reuse the GetKeyRevisions routing to obtain a per-key Present flag,
+       // then partition by that flag. The two RPCs share a routing rule by
+       // design; keeping the implementation in one place avoids drift.
+       revisions := make([]*clusterv1.KeyRevision, len(keys))
+       for i, key := range keys {
+               revisions[i] = &clusterv1.KeyRevision{Key: key}
+       }
+       cacheIndices := s.routeKeysByKind(keys, revisions)
+       s.fillCacheKeyRevisions(keys, cacheIndices, revisions)
+       for i, kr := range revisions {
+               if kr.Present {
                        resp.StillPresentKeys = append(resp.StillPresentKeys, 
keys[i])
                        continue
                }
@@ -169,17 +254,52 @@ func (s *NodeSchemaStatusServer) GetAbsentKeys(_ 
context.Context, req *clusterv1
        return resp, nil
 }
 
-// schemaKeysToPropIDs converts a slice of SchemaKey messages into the
-// internal propID strings used by the schema cache. Keys with unknown kind
-// strings produce an empty propID at the corresponding index; callers may
-// pass empty propIDs straight to schemaCache.GetKeyRevisions, which reports
-// them as Present=false (the cache map never holds an entry under "").
-// This matches the proto contract: an unknown kind is "absent from this
-// node's perspective" rather than a parse error.
-func schemaKeysToPropIDs(keys []*schemav1.SchemaKey) []string {
-       out := make([]string, len(keys))
+// routeKeysByKind fills in revisions[i] for every key whose Kind is tracked
+// by a registered schemaRepo and returns the indices of the remaining keys
+// that need to be answered from the schemaCache. The split is positional so
+// the caller can preserve the request key order in its response.
+func (s *NodeSchemaStatusServer) routeKeysByKind(keys []*schemav1.SchemaKey, 
revisions []*clusterv1.KeyRevision) []int {
+       reg := s.registry()
+       if reg == nil {
+               cacheIndices := make([]int, len(keys))
+               for i := range keys {
+                       cacheIndices[i] = i
+               }
+               return cacheIndices
+       }
+       cacheIndices := make([]int, 0, len(keys))
        for i, key := range keys {
-               out[i] = BuildPropertyIDFromSchemaKey(key)
+               kind := kindFromProtoString(key.GetKind())
+               if kind == 0 || !reg.HasKind(kind) {
+                       cacheIndices = append(cacheIndices, i)
+                       continue
+               }
+               rev, ok := reg.ResourceRevision(kind, key.GetGroup(), 
key.GetName())
+               revisions[i].ModRevision = rev
+               revisions[i].Present = ok
+       }
+       return cacheIndices
+}
+
+// fillCacheKeyRevisions answers the schemaCache-routed indices in a single
+// read-lock pass. A nil cache leaves the revisions untouched (Present=false,
+// ModRevision=0) which is the correct "node hasn't seen this key" answer.
+func (s *NodeSchemaStatusServer) fillCacheKeyRevisions(keys 
[]*schemav1.SchemaKey, cacheIndices []int, revisions []*clusterv1.KeyRevision) {
+       if len(cacheIndices) == 0 {
+               return
+       }
+       c := s.cache()
+       if c == nil {
+               return
+       }
+       propIDs := make([]string, len(cacheIndices))
+       for j, i := range cacheIndices {
+               propIDs[j] = BuildPropertyIDFromSchemaKey(keys[i])
+       }
+       statuses := c.GetKeyRevisions(propIDs)
+       for j, keyStatus := range statuses {
+               i := cacheIndices[j]
+               revisions[i].ModRevision = keyStatus.ModRevision
+               revisions[i].Present = keyStatus.Present
        }
-       return out
 }
diff --git a/banyand/metadata/schema/property/node_status_registry_test.go 
b/banyand/metadata/schema/property/node_status_registry_test.go
new file mode 100644
index 000000000..58108212c
--- /dev/null
+++ b/banyand/metadata/schema/property/node_status_registry_test.go
@@ -0,0 +1,252 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       metaschema 
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/schema/registry"
+)
+
+// fakeRevRepo emulates a per-service pkg/schema.schemaRepo for the registry
+// pointer-identity tests. The "Latest" knob simulates the eventCh-retry leak
+// scenario from pkg/schema/cache.go:106 — a repo lagging behind the property
+// schemaCache because a transient processEvent failure deferred the apply
+// onto the async retry queue.
+type fakeRevRepo struct {
+       resources map[metaschema.Kind]map[string]int64
+       latest    int64
+}
+
+func newFakeRevRepo(latest int64) *fakeRevRepo {
+       return &fakeRevRepo{
+               latest:    latest,
+               resources: make(map[metaschema.Kind]map[string]int64),
+       }
+}
+
+// put records (kind, group, name, rev) in the fake. group is parameterized
+// even though every current caller passes "g1" so the helper stays usable
+// when future tests exercise cross-group routing.
+//
+//nolint:unparam // group is generic by design; test fixtures happen to share 
"g1".
+func (f *fakeRevRepo) put(kind metaschema.Kind, group, name string, rev int64) 
{
+       if f.resources[kind] == nil {
+               f.resources[kind] = make(map[string]int64)
+       }
+       f.resources[kind][group+"/"+name] = rev
+}
+
+func (f *fakeRevRepo) LatestModRevision() int64 { return f.latest }
+
+func (f *fakeRevRepo) ResourceRevision(kind metaschema.Kind, group, name 
string) (int64, bool) {
+       if m := f.resources[kind]; m != nil {
+               if rev, ok := m[group+"/"+name]; ok {
+                       return rev, true
+               }
+       }
+       return 0, false
+}
+
+func (f *fakeRevRepo) IsAbsent(kind metaschema.Kind, group, name string) bool {
+       _, ok := f.ResourceRevision(kind, group, name)
+       return !ok
+}
+
+// TestExecutor_ResolvesGroupsViaSharedSchemaRepo asserts the cluster barrier's
+// per-node probe reads the same schemaRepo instance the executor consults via
+// LoadGroup. Pointer identity is the load-bearing Phase 2 §Step 2.5 invariant:
+// the registry returns a positive answer for (KindGroup, group, name) only
+// when the same repo holds the group, so the barrier and the executor cannot
+// disagree on the same (group, ModRevision) pair.
+func TestExecutor_ResolvesGroupsViaSharedSchemaRepo(t *testing.T) {
+       const targetRev int64 = 42
+       repo := newFakeRevRepo(targetRev)
+       repo.put(metaschema.KindGroup, "g1", "g1", targetRev)
+       repo.put(metaschema.KindMeasure, "g1", "m1", targetRev)
+
+       reg := registry.NewNodeRepoRegistry()
+       registry.MaybeRegister(reg,
+               
metaschema.KindGroup|metaschema.KindMeasure|metaschema.KindIndexRule|metaschema.KindIndexRuleBinding,
+               repo,
+       )
+
+       srv := NewNodeSchemaStatusServerWithRegistry(
+               func() *schemaCache { return nil },
+               func() *registry.NodeRepoRegistry { return reg },
+       )
+
+       maxResp, err := srv.GetMaxRevision(context.Background(), 
&clusterv1.GetMaxRevisionRequest{})
+       require.NoError(t, err)
+       assert.Equal(t, targetRev, maxResp.GetMaxModRevision(),
+               "registry-only node reports the registered repo's latest")
+
+       revResp, err := srv.GetKeyRevisions(context.Background(), 
&clusterv1.GetKeyRevisionsRequest{
+               Keys: []*schemav1.SchemaKey{
+                       {Kind: "group", Group: "g1", Name: "g1"},
+                       {Kind: "measure", Group: "g1", Name: "m1"},
+               },
+       })
+       require.NoError(t, err)
+       require.Len(t, revResp.GetRevisions(), 2)
+       for _, kr := range revResp.GetRevisions() {
+               assert.True(t, kr.GetPresent(), "%s present in registry-routed 
lookup", kr.GetKey().GetName())
+               assert.Equal(t, targetRev, kr.GetModRevision())
+       }
+
+       // Pointer identity: the value returned to the barrier and the value the
+       // executor would resolve via LoadGroup are read from the SAME repo
+       // pointer. Verifying via fake means changing the fake's internal state
+       // is observable from both lookups (no parallel cache to drift).
+       repo.put(metaschema.KindGroup, "g1", "g1", targetRev+1)
+       revResp2, err := srv.GetKeyRevisions(context.Background(), 
&clusterv1.GetKeyRevisionsRequest{
+               Keys: []*schemav1.SchemaKey{
+                       {Kind: "group", Group: "g1", Name: "g1"},
+               },
+       })
+       require.NoError(t, err)
+       require.Len(t, revResp2.GetRevisions(), 1)
+       assert.Equal(t, targetRev+1, 
revResp2.GetRevisions()[0].GetModRevision(),
+               "a repo-side mutation is immediately visible through the 
registry — same pointer")
+}
+
+// TestNodeStatus_DoesNotAdvancePastSchemaRepoLag injects a ModRevision skew
+// between the property schemaCache and the per-service schemaRepo (the
+// scenario the eventCh-retry path in pkg/schema.schemaRepo.SendMetadataEvent
+// produces) and asserts GetMaxRevision returns the laggard's value, never the
+// cache's. This is the regression that distinguishes the Phase 2 §Step 2.5
+// pivot from the original schemaCache-only implementation: the barrier no
+// longer certifies a revision the executor's resolver cache has not applied.
+func TestNodeStatus_DoesNotAdvancePastSchemaRepoLag(t *testing.T) {
+       // schemaCache is at watermark 100 — every kind it tracks has been
+       // notified to handlers up to revision 100.
+       c := newSchemaCache()
+       c.notifiedModRevision = 100
+
+       // schemaRepo lags behind at 50 — the canonical executor-cache scenario
+       // the §Step 2.5 pivot exists to handle.
+       repo := newFakeRevRepo(50)
+
+       reg := registry.NewNodeRepoRegistry()
+       registry.MaybeRegister(reg, metaschema.KindMeasure, repo)
+
+       srv := NewNodeSchemaStatusServerWithRegistry(
+               func() *schemaCache { return c },
+               func() *registry.NodeRepoRegistry { return reg },
+       )
+
+       resp, err := srv.GetMaxRevision(context.Background(), 
&clusterv1.GetMaxRevisionRequest{})
+       require.NoError(t, err)
+       assert.Equal(t, int64(50), resp.GetMaxModRevision(),
+               "GetMaxRevision = min(schemaCache.watermark, 
registry.LatestModRevision); the laggard wins")
+
+       // When the laggard catches up to 100, GetMaxRevision should match.
+       repo.latest = 100
+       resp2, err := srv.GetMaxRevision(context.Background(), 
&clusterv1.GetMaxRevisionRequest{})
+       require.NoError(t, err)
+       assert.Equal(t, int64(100), resp2.GetMaxModRevision(),
+               "once the laggard catches up, the min equals the watermark")
+}
+
+// TestNodeStatus_RegistryOnly_NoSchemaCache covers the "registry alone"
+// configuration: a node whose schemaCache provider returns nil but whose
+// registry has registered repos. The server must report the registry's
+// LatestModRevision as the max — mirrors the production wiring on a
+// data-node before the schemaCache has been fully populated.
+func TestNodeStatus_RegistryOnly_NoSchemaCache(t *testing.T) {
+       repo := newFakeRevRepo(77)
+       repo.put(metaschema.KindStream, "g1", "s1", 60)
+
+       reg := registry.NewNodeRepoRegistry()
+       registry.MaybeRegister(reg, metaschema.KindStream, repo)
+
+       srv := NewNodeSchemaStatusServerWithRegistry(
+               func() *schemaCache { return nil },
+               func() *registry.NodeRepoRegistry { return reg },
+       )
+
+       resp, err := srv.GetMaxRevision(context.Background(), 
&clusterv1.GetMaxRevisionRequest{})
+       require.NoError(t, err)
+       assert.Equal(t, int64(77), resp.GetMaxModRevision())
+}
+
+// TestNodeStatus_EmptyRegistry_FallsBackToSchemaCache covers the production
+// boot path: the registry exists but no service has registered yet. The
+// server must skip the registry's "min" contribution rather than gate the
+// barrier verdict to 0. Mirrors the schemaCache-only behavior for a
+// metadata-only host.
+func TestNodeStatus_EmptyRegistry_FallsBackToSchemaCache(t *testing.T) {
+       c := newSchemaCache()
+       c.notifiedModRevision = 25
+
+       reg := registry.NewNodeRepoRegistry()
+
+       srv := NewNodeSchemaStatusServerWithRegistry(
+               func() *schemaCache { return c },
+               func() *registry.NodeRepoRegistry { return reg },
+       )
+
+       resp, err := srv.GetMaxRevision(context.Background(), 
&clusterv1.GetMaxRevisionRequest{})
+       require.NoError(t, err)
+       assert.Equal(t, int64(25), resp.GetMaxModRevision(),
+               "empty registry contributes nothing — schemaCache watermark 
wins")
+}
+
+// TestNodeStatus_KindRouting_TopNFallsBackToSchemaCache asserts that kinds
+// the registry does NOT track (TopNAggregation, Property) fall through to the
+// schemaCache lookup unchanged. The Phase 2 §Step 2.5 invariant covers only
+// the kinds schemaRepo holds; TopN/Property barrier reads remain on the
+// schemaCache.notifiedModRevision watermark.
+func TestNodeStatus_KindRouting_TopNFallsBackToSchemaCache(t *testing.T) {
+       idTopN, entryTopN := makeEntry(metaschema.KindTopNAggregation, "topn1", 
33)
+       c := newSchemaCache()
+       c.entries[idTopN] = entryTopN
+       c.notifiedModRevision = 33
+
+       repo := newFakeRevRepo(33)
+       repo.put(metaschema.KindMeasure, "g1", "m1", 33)
+       reg := registry.NewNodeRepoRegistry()
+       registry.MaybeRegister(reg, metaschema.KindMeasure, repo)
+
+       srv := NewNodeSchemaStatusServerWithRegistry(
+               func() *schemaCache { return c },
+               func() *registry.NodeRepoRegistry { return reg },
+       )
+
+       resp, err := srv.GetKeyRevisions(context.Background(), 
&clusterv1.GetKeyRevisionsRequest{
+               Keys: []*schemav1.SchemaKey{
+                       {Kind: "top_n_aggregation", Group: testGroup, Name: 
"topn1"},
+                       {Kind: "measure", Group: "g1", Name: "m1"},
+               },
+       })
+       require.NoError(t, err)
+       require.Len(t, resp.GetRevisions(), 2)
+
+       assert.True(t, resp.Revisions[0].GetPresent(), "TopN routes through 
schemaCache and is present at watermark")
+       assert.Equal(t, int64(33), resp.Revisions[0].GetModRevision())
+
+       assert.True(t, resp.Revisions[1].GetPresent(), "measure routes through 
registry and is present in repo")
+       assert.Equal(t, int64(33), resp.Revisions[1].GetModRevision())
+}
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 454e5e508..befe295d8 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -302,13 +302,16 @@ func (s *server) Serve() run.StopNotify {
                // time; the fail-closed nil-cache contract handles in-flight 
probes
                // during initialization.
                if svc, svcOk := s.metadataRepo.(metadata.Service); svcOk {
-                       clusterv1.RegisterNodeSchemaStatusServiceServer(s.ser, 
property.NewNodeSchemaStatusServerForRegistry(func() *property.SchemaRegistry {
-                               reg, regOk := 
svc.SchemaRegistry().(*property.SchemaRegistry)
-                               if !regOk {
-                                       return nil
-                               }
-                               return reg
-                       }))
+                       clusterv1.RegisterNodeSchemaStatusServiceServer(s.ser, 
property.NewNodeSchemaStatusServerForRegistryWithNodeRepo(
+                               func() *property.SchemaRegistry {
+                                       reg, regOk := 
svc.SchemaRegistry().(*property.SchemaRegistry)
+                                       if !regOk {
+                                               return nil
+                                       }
+                                       return reg
+                               },
+                               svc.NodeRepoRegistry,
+                       ))
                }
        }
 

Reply via email to