This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 593b00369f2c0d8eef2aa0503d4d3308153f7159
Author: Hongtao Gao <[email protected]>
AuthorDate: Thu May 7 23:59:39 2026 +0000

    feat(barrier): expose NodeSchemaStatusService on data-node ports (Phase 2 
CP-6 follow-up)
    
    Decouples cluster.v1.NodeSchemaStatusService from the liaison-only
    fodc.v1.GroupLifecycleService inside banyand/queue/sub/server.go's
    Serve() block, which previously gated both registrations together
    under one `if s.metadataRepo != nil` check. The conflation prevented
    data-node startup from registering NodeSchemaStatusService alone:
    adding a SetMetadataRepo call on the data node would also drag in
    GroupLifecycleService, whose InspectAll path is liaison-shaped by
    design (it returns a cluster-wide group inventory to fodc agents).
    
      - Add queue.Server.SetNodeSchemaStatusRepo(metadata.Service) so
        each callsite owns one service:
          - liaison startup (pkg/cmdsetup/liaison.go) calls both
            SetMetadataRepo and SetNodeSchemaStatusRepo.
          - data-node startup (pkg/cmdsetup/data.go) calls only
            SetNodeSchemaStatusRepo.
          - banyand/queue/local.go gets a no-op SetNodeSchemaStatusRepo
            for interface conformance.
      - Split the Serve() registration block: GroupLifecycleService is
        gated on metadataRepo != nil; NodeSchemaStatusService is gated
        on nodeSchemaStatusRepo != nil. The metadata.Service is
        captured directly by the new setter, dropping the previous
        runtime type-assertion against metadataRepo.
    
    After this change the cluster barrier's tier2 fan-out probes
    data nodes for real MaxRevision instead of relying on the
    cross-version Unimplemented→ready policy in barrier_cluster.go,
    which is now correctly reserved for true Phase-1 peers.
    
    Repairs the regression that surfaced once data nodes started
    returning real MaxRevision values:
    
      - NodeSchemaStatusServer.GetMaxRevision previously returned
        min(schemaCache.notifiedModRevision,
            NodeRepoRegistry.LatestModRevision()).
      - NodeRepoRegistry.LatestModRevision aggregated per-service
        schemaRepo.latestModRevision via min, but each schemaRepo only
        advances on events for its own catalog (pkg/schema/init.go:72
        filters by g.Catalog). The min was therefore pinned to the
        slowest catalog's last seen revision, so a steady measure-only
        event stream gated the barrier on an unrelated trace
        schemaRepo watermark (or vice-versa). Distributed integration
        failed 21 of 32 specs because of this aggregation alone.
      - GetMaxRevision now reads cache only — symmetric with the
        receiving liaison's selfName probe at barrier_cluster.go:354-360
        which has always read cache-only.
      - NodeRepoRegistry.LatestModRevision and the corresponding
        methods on registry.RevisionRepository / pkg/schema.
        RevisionRepository / fakeRevRepo / fakeRevisionRepo are deleted
        so a future caller cannot reintroduce the bug. Per-key gating
        (GetKeyRevisions / GetAbsentKeys) keeps routing through the
        registry by kind via ResourceRevision / IsAbsent / HasKind, so
        §GC-1's executor-cache convergence on the receiving liaison's
        write/query gates is preserved.
    
    §6.12a / §6.12d remain g.PIt under the new mechanism for an
    orthogonal reason: the laggard-detection assertion passes but the
    post-resume AwaitRevisionApplied(newRev) does not converge inside
    the spec timeout. The per-key §6.12b/c flows do converge through
    the same queue-drain path, so the gap is scoped to the global
    notifiedModRevision watermark advancing through queue replay —
    not to the data-node fan-out, which §NSS-1 verified.
    
    Distributed schema integration suite reports `30 Passed | 0 Failed
    | 2 Pending | 0 Skipped` (95.4s).
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../liaison/grpc/schema_revision_registry_test.go  | 23 ++----
 banyand/metadata/schema/property/node_status.go    | 35 +++-----
 .../schema/property/node_status_registry_test.go   | 93 ++++++----------------
 banyand/queue/local.go                             |  3 +
 banyand/queue/queue.go                             |  6 ++
 banyand/queue/sub/server.go                        | 43 +++++-----
 pkg/cmdsetup/data.go                               |  1 +
 pkg/cmdsetup/liaison.go                            |  1 +
 pkg/schema/init.go                                 |  6 +-
 pkg/schema/registry/registry.go                    | 41 +++-------
 pkg/schema/registry/registry_test.go               | 45 ++++-------
 pkg/schema/schema.go                               |  5 +-
 test/cases/schema/barrier_cluster.go               | 65 ++++++---------
 13 files changed, 128 insertions(+), 239 deletions(-)

diff --git a/banyand/liaison/grpc/schema_revision_registry_test.go 
b/banyand/liaison/grpc/schema_revision_registry_test.go
index 3a17de4e5..7885bd87b 100644
--- a/banyand/liaison/grpc/schema_revision_registry_test.go
+++ b/banyand/liaison/grpc/schema_revision_registry_test.go
@@ -19,7 +19,6 @@ package grpc
 
 import (
        "sync"
-       "sync/atomic"
        "testing"
        "time"
 
@@ -37,15 +36,12 @@ import (
 )
 
 // fakeRevisionRepo implements registry.RevisionRepository with a controllable
-// per-key map and an atomically advancing latestModRevision watermark. Tests
-// use it to simulate the eventCh-retry leak the gate must close: a state
-// where the entityRepo locator already advanced to R but the schemaRepo (and
-// therefore the registry) has not — in production this is the data-node
-// executor lag the cluster barrier already accounts for via NodeRepoRegistry.
+// per-key map. Tests use it to simulate the eventCh-retry leak the gate must
+// close: a state where the entityRepo locator already advanced to R but the
+// schemaRepo (and therefore the registry) has not.
 type fakeRevisionRepo struct {
-       keys   map[string]int64
-       latest atomic.Int64
-       mu     sync.RWMutex
+       keys map[string]int64
+       mu   sync.RWMutex
 }
 
 func newFakeRevisionRepo() *fakeRevisionRepo {
@@ -60,17 +56,8 @@ func (f *fakeRevisionRepo) set(kind schema.Kind, group, name 
string, rev int64)
        f.mu.Lock()
        f.keys[f.keyOf(kind, group, name)] = rev
        f.mu.Unlock()
-       for {
-               cur := f.latest.Load()
-               if rev <= cur || f.latest.CompareAndSwap(cur, rev) {
-                       return
-               }
-       }
 }
 
-// LatestModRevision implements registry.RevisionRepository.
-func (f *fakeRevisionRepo) LatestModRevision() int64 { return f.latest.Load() }
-
 // ResourceRevision implements registry.RevisionRepository.
 func (f *fakeRevisionRepo) ResourceRevision(kind schema.Kind, group, name 
string) (int64, bool) {
        f.mu.RLock()
diff --git a/banyand/metadata/schema/property/node_status.go 
b/banyand/metadata/schema/property/node_status.go
index 0e24027e5..51cdfd946 100644
--- a/banyand/metadata/schema/property/node_status.go
+++ b/banyand/metadata/schema/property/node_status.go
@@ -145,35 +145,18 @@ func (s *NodeSchemaStatusServer) registry() 
*registry.NodeRepoRegistry {
        return s.registryProvider()
 }
 
-// GetMaxRevision returns the minimum mod_revision across the schemaCache
-// watermark and the per-service NodeRepoRegistry. Either source contributes
-// only when present and non-empty; a registry with no registered repos is
-// treated as "no executor cache to gate on" and skipped (otherwise its
-// LatestModRevision()=0 would gate every verdict to 0 on a metadata-only
-// host).
-//
-// When neither source has data the response carries 0, which the liaison's
-// barrier loop interprets as "node not yet caught up" and continues polling.
+// GetMaxRevision returns the schemaCache's notifiedModRevision watermark.
+// The cache observes every catalog's events, so it is the correct global
+// watermark — symmetric with the receiving liaison's selfName probe at
+// barrier_cluster.go:354-360 which also reads cache-only. Per-key gating
+// (executor-cache routing by kind) is handled by GetKeyRevisions /
+// GetAbsentKeys via the NodeRepoRegistry.
 func (s *NodeSchemaStatusServer) GetMaxRevision(_ context.Context, _ 
*clusterv1.GetMaxRevisionRequest) (*clusterv1.GetMaxRevisionResponse, error) {
-       var (
-               minRev    int64
-               hasSource bool
-       )
-       if c := s.cache(); c != nil {
-               minRev = c.GetMaxModRevision()
-               hasSource = true
-       }
-       if reg := s.registry(); reg != nil && !reg.Empty() {
-               regRev := reg.LatestModRevision()
-               if !hasSource || regRev < minRev {
-                       minRev = regRev
-               }
-               hasSource = true
-       }
-       if !hasSource {
+       c := s.cache()
+       if c == nil {
                return &clusterv1.GetMaxRevisionResponse{}, nil
        }
-       return &clusterv1.GetMaxRevisionResponse{MaxModRevision: minRev}, nil
+       return &clusterv1.GetMaxRevisionResponse{MaxModRevision: 
c.GetMaxModRevision()}, nil
 }
 
 // GetKeyRevisions returns per-key (mod_revision, present) pairs in the same
diff --git a/banyand/metadata/schema/property/node_status_registry_test.go 
b/banyand/metadata/schema/property/node_status_registry_test.go
index 58108212c..de1180677 100644
--- a/banyand/metadata/schema/property/node_status_registry_test.go
+++ b/banyand/metadata/schema/property/node_status_registry_test.go
@@ -31,18 +31,13 @@ import (
 )
 
 // fakeRevRepo emulates a per-service pkg/schema.schemaRepo for the registry
-// pointer-identity tests. The "Latest" knob simulates the eventCh-retry leak
-// scenario from pkg/schema/cache.go:106 — a repo lagging behind the property
-// schemaCache because a transient processEvent failure deferred the apply
-// onto the async retry queue.
+// pointer-identity tests.
 type fakeRevRepo struct {
        resources map[metaschema.Kind]map[string]int64
-       latest    int64
 }
 
-func newFakeRevRepo(latest int64) *fakeRevRepo {
+func newFakeRevRepo() *fakeRevRepo {
        return &fakeRevRepo{
-               latest:    latest,
                resources: make(map[metaschema.Kind]map[string]int64),
        }
 }
@@ -59,8 +54,6 @@ func (f *fakeRevRepo) put(kind metaschema.Kind, group, name 
string, rev int64) {
        f.resources[kind][group+"/"+name] = rev
 }
 
-func (f *fakeRevRepo) LatestModRevision() int64 { return f.latest }
-
 func (f *fakeRevRepo) ResourceRevision(kind metaschema.Kind, group, name 
string) (int64, bool) {
        if m := f.resources[kind]; m != nil {
                if rev, ok := m[group+"/"+name]; ok {
@@ -83,7 +76,7 @@ func (f *fakeRevRepo) IsAbsent(kind metaschema.Kind, group, 
name string) bool {
 // disagree on the same (group, ModRevision) pair.
 func TestExecutor_ResolvesGroupsViaSharedSchemaRepo(t *testing.T) {
        const targetRev int64 = 42
-       repo := newFakeRevRepo(targetRev)
+       repo := newFakeRevRepo()
        repo.put(metaschema.KindGroup, "g1", "g1", targetRev)
        repo.put(metaschema.KindMeasure, "g1", "m1", targetRev)
 
@@ -98,11 +91,6 @@ func TestExecutor_ResolvesGroupsViaSharedSchemaRepo(t 
*testing.T) {
                func() *registry.NodeRepoRegistry { return reg },
        )
 
-       maxResp, err := srv.GetMaxRevision(context.Background(), 
&clusterv1.GetMaxRevisionRequest{})
-       require.NoError(t, err)
-       assert.Equal(t, targetRev, maxResp.GetMaxModRevision(),
-               "registry-only node reports the registered repo's latest")
-
        revResp, err := srv.GetKeyRevisions(context.Background(), 
&clusterv1.GetKeyRevisionsRequest{
                Keys: []*schemav1.SchemaKey{
                        {Kind: "group", Group: "g1", Name: "g1"},
@@ -132,23 +120,20 @@ func TestExecutor_ResolvesGroupsViaSharedSchemaRepo(t 
*testing.T) {
                "a repo-side mutation is immediately visible through the 
registry — same pointer")
 }
 
-// TestNodeStatus_DoesNotAdvancePastSchemaRepoLag injects a ModRevision skew
-// between the property schemaCache and the per-service schemaRepo (the
-// scenario the eventCh-retry path in pkg/schema.schemaRepo.SendMetadataEvent
-// produces) and asserts GetMaxRevision returns the laggard's value, never the
-// cache's. This is the regression that distinguishes the Phase 2 §Step 2.5
-// pivot from the original schemaCache-only implementation: the barrier no
-// longer certifies a revision the executor's resolver cache has not applied.
-func TestNodeStatus_DoesNotAdvancePastSchemaRepoLag(t *testing.T) {
-       // schemaCache is at watermark 100 — every kind it tracks has been
-       // notified to handlers up to revision 100.
+// TestNodeStatus_GetMaxRevision_ReadsCacheOnly asserts the new GetMaxRevision
+// contract: the response equals the schemaCache's notifiedModRevision and is
+// independent of the registry. The cache observes every catalog's events, so
+// it is the correct global watermark — symmetric with the receiving liaison's
+// selfName probe at barrier_cluster.go:354-360. Per-key executor-cache gating
+// stays on the registry via GetKeyRevisions / GetAbsentKeys.
+func TestNodeStatus_GetMaxRevision_ReadsCacheOnly(t *testing.T) {
        c := newSchemaCache()
        c.notifiedModRevision = 100
 
-       // schemaRepo lags behind at 50 — the canonical executor-cache scenario
-       // the §Step 2.5 pivot exists to handle.
-       repo := newFakeRevRepo(50)
-
+       // A registered repo with per-catalog state must NOT shift 
GetMaxRevision —
+       // the cache value alone wins.
+       repo := newFakeRevRepo()
+       repo.put(metaschema.KindMeasure, "g1", "m1", 50)
        reg := registry.NewNodeRepoRegistry()
        registry.MaybeRegister(reg, metaschema.KindMeasure, repo)
 
@@ -159,26 +144,16 @@ func TestNodeStatus_DoesNotAdvancePastSchemaRepoLag(t 
*testing.T) {
 
        resp, err := srv.GetMaxRevision(context.Background(), 
&clusterv1.GetMaxRevisionRequest{})
        require.NoError(t, err)
-       assert.Equal(t, int64(50), resp.GetMaxModRevision(),
-               "GetMaxRevision = min(schemaCache.watermark, 
registry.LatestModRevision); the laggard wins")
-
-       // When the laggard catches up to 100, GetMaxRevision should match.
-       repo.latest = 100
-       resp2, err := srv.GetMaxRevision(context.Background(), 
&clusterv1.GetMaxRevisionRequest{})
-       require.NoError(t, err)
-       assert.Equal(t, int64(100), resp2.GetMaxModRevision(),
-               "once the laggard catches up, the min equals the watermark")
+       assert.Equal(t, int64(100), resp.GetMaxModRevision(),
+               "GetMaxRevision = schemaCache.notifiedModRevision; registry 
contributes nothing")
 }
 
-// TestNodeStatus_RegistryOnly_NoSchemaCache covers the "registry alone"
-// configuration: a node whose schemaCache provider returns nil but whose
-// registry has registered repos. The server must report the registry's
-// LatestModRevision as the max — mirrors the production wiring on a
-// data-node before the schemaCache has been fully populated.
-func TestNodeStatus_RegistryOnly_NoSchemaCache(t *testing.T) {
-       repo := newFakeRevRepo(77)
+// TestNodeStatus_GetMaxRevision_NilCacheReturnsZero pins the boundary case:
+// when the cache provider returns nil, the response carries 0 regardless of
+// what the registry holds.
+func TestNodeStatus_GetMaxRevision_NilCacheReturnsZero(t *testing.T) {
+       repo := newFakeRevRepo()
        repo.put(metaschema.KindStream, "g1", "s1", 60)
-
        reg := registry.NewNodeRepoRegistry()
        registry.MaybeRegister(reg, metaschema.KindStream, repo)
 
@@ -189,29 +164,7 @@ func TestNodeStatus_RegistryOnly_NoSchemaCache(t 
*testing.T) {
 
        resp, err := srv.GetMaxRevision(context.Background(), 
&clusterv1.GetMaxRevisionRequest{})
        require.NoError(t, err)
-       assert.Equal(t, int64(77), resp.GetMaxModRevision())
-}
-
-// TestNodeStatus_EmptyRegistry_FallsBackToSchemaCache covers the production
-// boot path: the registry exists but no service has registered yet. The
-// server must skip the registry's "min" contribution rather than gate the
-// barrier verdict to 0. Mirrors the schemaCache-only behavior for a
-// metadata-only host.
-func TestNodeStatus_EmptyRegistry_FallsBackToSchemaCache(t *testing.T) {
-       c := newSchemaCache()
-       c.notifiedModRevision = 25
-
-       reg := registry.NewNodeRepoRegistry()
-
-       srv := NewNodeSchemaStatusServerWithRegistry(
-               func() *schemaCache { return c },
-               func() *registry.NodeRepoRegistry { return reg },
-       )
-
-       resp, err := srv.GetMaxRevision(context.Background(), 
&clusterv1.GetMaxRevisionRequest{})
-       require.NoError(t, err)
-       assert.Equal(t, int64(25), resp.GetMaxModRevision(),
-               "empty registry contributes nothing — schemaCache watermark 
wins")
+       assert.Equal(t, int64(0), resp.GetMaxModRevision())
 }
 
 // TestNodeStatus_KindRouting_TopNFallsBackToSchemaCache asserts that kinds
@@ -225,7 +178,7 @@ func 
TestNodeStatus_KindRouting_TopNFallsBackToSchemaCache(t *testing.T) {
        c.entries[idTopN] = entryTopN
        c.notifiedModRevision = 33
 
-       repo := newFakeRevRepo(33)
+       repo := newFakeRevRepo()
        repo.put(metaschema.KindMeasure, "g1", "m1", 33)
        reg := registry.NewNodeRepoRegistry()
        registry.MaybeRegister(reg, metaschema.KindMeasure, repo)
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 362e61bbb..4e2f98d16 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -106,6 +106,9 @@ func (*local) SetRouteProviders(_ 
map[string]route.TableProvider) {
 func (*local) SetMetadataRepo(_ metadata.Repo) {
 }
 
+func (*local) SetNodeSchemaStatusRepo(_ metadata.Service) {
+}
+
 func (*local) Register(bus.Topic, schema.EventHandler) {
 }
 
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 1b3e06936..4c48eb072 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -78,7 +78,13 @@ type Server interface {
        RegisterChunkedSyncHandler(topic bus.Topic, handler ChunkedSyncHandler)
        GetPort() *uint32
        SetRouteProviders(providers map[string]route.TableProvider)
+       // SetMetadataRepo enables fodc.v1.GroupLifecycleService — liaison-only
+       // by design; data-node processes must not call it.
        SetMetadataRepo(repo metadata.Repo)
+       // SetNodeSchemaStatusRepo enables cluster.v1.NodeSchemaStatusService —
+       // per-node by design (reports the local schema cache); liaison and
+       // data-node processes both call it.
+       SetNodeSchemaStatusRepo(svc metadata.Service)
 }
 
 // BatchPublisher is the interface for publishing data in batch.
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index befe295d8..33c39fc52 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -84,6 +84,7 @@ type server struct {
        omr                   observability.MetricsRegistry
        creds                 credentials.TransportCredentials
        metadataRepo          metadata.Repo
+       nodeSchemaStatusRepo  metadata.Service
        clientCloser          context.CancelFunc
        ser                   *grpclib.Server
        listeners             map[bus.Topic][]bus.MessageListener
@@ -292,27 +293,23 @@ func (s *server) Serve() run.StopNotify {
        tracev1.RegisterTraceServiceServer(s.ser, &traceService{ser: s})
        if s.metadataRepo != nil {
                fodcv1.RegisterGroupLifecycleServiceServer(s.ser, s)
-               // Phase 2 Step 2.1: data nodes expose NodeSchemaStatusService 
so the
-               // liaison's barrier fan-out (Step 2.2) can probe each node's 
local
-               // schema cache. The cache the server reads is the same 
SchemaRegistry
-               // instance the data-node query executor consults — see
-               // banyand/metadata/schema/property/node_status.go for the 
coherence
-               // argument. The registry is resolved per request (closure) so 
the
-               // service registers even when SchemaRegistry isn't ready at 
Serve
-               // time; the fail-closed nil-cache contract handles in-flight 
probes
+       }
+       if s.nodeSchemaStatusRepo != nil {
+               // The registry is resolved per request (closure) so the service
+               // registers even when SchemaRegistry isn't ready at Serve time;
+               // the fail-closed nil-cache contract handles in-flight probes
                // during initialization.
-               if svc, svcOk := s.metadataRepo.(metadata.Service); svcOk {
-                       clusterv1.RegisterNodeSchemaStatusServiceServer(s.ser, 
property.NewNodeSchemaStatusServerForRegistryWithNodeRepo(
-                               func() *property.SchemaRegistry {
-                                       reg, regOk := 
svc.SchemaRegistry().(*property.SchemaRegistry)
-                                       if !regOk {
-                                               return nil
-                                       }
-                                       return reg
-                               },
-                               svc.NodeRepoRegistry,
-                       ))
-               }
+               svc := s.nodeSchemaStatusRepo
+               clusterv1.RegisterNodeSchemaStatusServiceServer(s.ser, 
property.NewNodeSchemaStatusServerForRegistryWithNodeRepo(
+                       func() *property.SchemaRegistry {
+                               reg, regOk := 
svc.SchemaRegistry().(*property.SchemaRegistry)
+                               if !regOk {
+                                       return nil
+                               }
+                               return reg
+                       },
+                       svc.NodeRepoRegistry,
+               ))
        }
 
        var ctx context.Context
@@ -425,6 +422,12 @@ func (s *server) SetMetadataRepo(repo metadata.Repo) {
        s.metadataRepo = repo
 }
 
+// SetNodeSchemaStatusRepo wires the metadata.Service whose SchemaRegistry +
+// NodeRepoRegistry back the per-node NodeSchemaStatusService.
+func (s *server) SetNodeSchemaStatusRepo(svc metadata.Service) {
+       s.nodeSchemaStatusRepo = svc
+}
+
 type metrics struct {
        totalStarted  meter.Counter
        totalFinished meter.Counter
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 6c2af425c..9f2185e9f 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -53,6 +53,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
        metaSvc.SetMetricsRegistry(metricSvc)
        pm := protector.NewMemory(metricSvc)
        pipeline := sub.NewServer(metricSvc)
+       pipeline.SetNodeSchemaStatusRepo(metaSvc)
        metaSvc.SetSnapshotPipeline(pipeline)
        propertyStreamPipeline := queue.Local()
        metaSvc.SetPropertyPipelineClient(propertyStreamPipeline)
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index ec9248355..28c71b407 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -105,6 +105,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                TraceLiaisonNodeRegistry:   
grpc.NewClusterNodeRegistry(data.TopicTraceWrite, tire1Client, 
traceLiaisonNodeSel),
        }, metricSvc, pm, routeProviders)
        internalPipeline.SetMetadataRepo(metaSvc)
+       internalPipeline.SetNodeSchemaStatusRepo(metaSvc)
        profSvc := observability.NewProfService()
        httpServer := http.NewServer(grpcServer.GetAuthReloader())
        var units []run.Unit
diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index 3c20b32c9..cf66ad0c6 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -29,9 +29,9 @@ import (
 )
 
 // schemaRepo satisfies the registry.RevisionRepository contract via its
-// LatestModRevision / ResourceRevision / IsAbsent methods. The compile-time
-// assertion lives here (next to RevisionRepository) so a future API drift
-// breaks the build before the cluster barrier silently degrades.
+// ResourceRevision / IsAbsent methods. The compile-time assertion lives here
+// so a future API drift breaks the build before the cluster barrier's
+// per-key gating silently degrades.
 var _ registry.RevisionRepository = (*schemaRepo)(nil)
 
 var initTimeout = 10 * time.Second
diff --git a/pkg/schema/registry/registry.go b/pkg/schema/registry/registry.go
index d69883829..d90e57230 100644
--- a/pkg/schema/registry/registry.go
+++ b/pkg/schema/registry/registry.go
@@ -34,7 +34,6 @@ import (
 // keeping the interface here (alongside the registry) instead of importing
 // pkg/schema avoids a dependency cycle through banyand/metadata.
 type RevisionRepository interface {
-       LatestModRevision() int64
        ResourceRevision(kind schema.Kind, group, name string) (int64, bool)
        IsAbsent(kind schema.Kind, group, name string) bool
 }
@@ -42,14 +41,21 @@ type RevisionRepository interface {
 // NodeRepoRegistry aggregates per-service RevisionRepository instances on a
 // single node (liaison or data). Each banyand service (measure, stream, trace,
 // …) registers its schemaRepo during PreRun with a kind bitmask describing the
-// kinds the repo tracks. The registry routes barrier and node-status lookups 
to
-// the same caches the executor consults during query plan execution, so a
-// positive answer from LatestModRevision implies every registered repo has
+// kinds the repo tracks. The registry routes per-key barrier and node-status
+// lookups (GetKeyRevisions / GetAbsentKeys) to the same caches the executor
+// consults during query plan execution, so a positive answer from
+// ResourceRevision / IsAbsent implies the registered repo for that kind has
 // applied the revision and any executor cache the node exposes is at least at
 // that point. This is the load-bearing prerequisite for the Phase 2
 // single-cache invariant — without it the cluster barrier would certify a
 // cache the data-node executor never reads.
 //
+// The aggregate global watermark belongs to the schemaCache
+// (notifiedModRevision), not to the registry: per-service schemaRepos filter
+// events by catalog (pkg/schema/init.go), so their per-repo latestModRevision
+// is incomparable across repos and a min/max aggregate cannot represent a
+// meaningful node-wide watermark.
+//
 // Safe for concurrent registration during PreRun and concurrent lookup.
 type NodeRepoRegistry struct {
        byKind map[schema.Kind][]RevisionRepository
@@ -92,25 +98,6 @@ func (r *NodeRepoRegistry) Register(kinds schema.Kind, repo 
RevisionRepository)
        }
 }
 
-// LatestModRevision returns the minimum LatestModRevision across every
-// registered repo. A registry with no repos returns 0, matching the
-// "node not yet caught up" semantic the cluster-barrier loop interprets as
-// "keep polling".
-func (r *NodeRepoRegistry) LatestModRevision() int64 {
-       r.mu.RLock()
-       defer r.mu.RUnlock()
-       if len(r.repos) == 0 {
-               return 0
-       }
-       minRev := r.repos[0].LatestModRevision()
-       for _, repo := range r.repos[1:] {
-               if rev := repo.LatestModRevision(); rev < minRev {
-                       minRev = rev
-               }
-       }
-       return minRev
-}
-
 // ResourceRevision routes the lookup to repos registered against the given
 // kind and returns the first match. Returns (0, false) when no repo is
 // registered for the kind or no registered repo holds the resource — the same
@@ -149,11 +136,9 @@ func (r *NodeRepoRegistry) HasKind(kind schema.Kind) bool {
        return len(r.byKind[kind]) > 0
 }
 
-// Empty reports whether any repo has been registered. The node-status server
-// uses this to skip the registry's min-rev contribution on a node where no
-// per-service schemaRepo runs (e.g. a property-only metadata host) — without
-// the check, an empty registry's LatestModRevision()=0 would gate every
-// barrier verdict to 0.
+// Empty reports whether any repo has been registered. GetAbsentKeys uses
+// this to route through the schemaCache on a node where no per-service
+// schemaRepo runs (e.g. a property-only metadata host).
 func (r *NodeRepoRegistry) Empty() bool {
        r.mu.RLock()
        defer r.mu.RUnlock()
diff --git a/pkg/schema/registry/registry_test.go 
b/pkg/schema/registry/registry_test.go
index 53a6d50f8..b5356757e 100644
--- a/pkg/schema/registry/registry_test.go
+++ b/pkg/schema/registry/registry_test.go
@@ -29,12 +29,10 @@ import (
 // fakeRevRepo is a minimal RevisionRepository for registry unit tests.
 type fakeRevRepo struct {
        resources map[string]map[string]int64
-       latest    int64
 }
 
-func newFakeRevRepo(latest int64) *fakeRevRepo {
+func newFakeRevRepo() *fakeRevRepo {
        return &fakeRevRepo{
-               latest:    latest,
                resources: make(map[string]map[string]int64),
        }
 }
@@ -46,10 +44,6 @@ func (f *fakeRevRepo) put(group, name string, rev int64) {
        f.resources[group][name] = rev
 }
 
-func (f *fakeRevRepo) LatestModRevision() int64 {
-       return f.latest
-}
-
 func (f *fakeRevRepo) ResourceRevision(_ metaschema.Kind, group, name string) 
(int64, bool) {
        if g := f.resources[group]; g != nil {
                if rev, ok := g[name]; ok {
@@ -64,36 +58,28 @@ func (f *fakeRevRepo) IsAbsent(kind metaschema.Kind, group, 
name string) bool {
        return !ok
 }
 
-func TestNodeRepoRegistry_EmptyReturnsZero(t *testing.T) {
+func TestNodeRepoRegistry_EmptyHasNoKindsAndAbsentKeys(t *testing.T) {
        r := NewNodeRepoRegistry()
-       assert.Equal(t, int64(0), r.LatestModRevision())
        rev, ok := r.ResourceRevision(metaschema.KindStream, "g", "n")
        assert.Equal(t, int64(0), rev)
        assert.False(t, ok)
        assert.True(t, r.IsAbsent(metaschema.KindStream, "g", "n"))
        assert.False(t, r.HasKind(metaschema.KindStream))
+       assert.True(t, r.Empty())
 }
 
 func TestNodeRepoRegistry_NilRepoOrZeroKindIsNoop(t *testing.T) {
        r := NewNodeRepoRegistry()
        r.Register(metaschema.KindStream, nil)
-       r.Register(0, newFakeRevRepo(5))
-       assert.Equal(t, int64(0), r.LatestModRevision())
-}
-
-func TestNodeRepoRegistry_LatestModRevisionTakesMin(t *testing.T) {
-       r := NewNodeRepoRegistry()
-       r.Register(metaschema.KindMeasure|metaschema.KindIndexRule, 
newFakeRevRepo(10))
-       r.Register(metaschema.KindStream, newFakeRevRepo(7))
-       r.Register(metaschema.KindTrace, newFakeRevRepo(15))
-
-       assert.Equal(t, int64(7), r.LatestModRevision(), "min over registered 
repos wins")
+       r.Register(0, newFakeRevRepo())
+       assert.True(t, r.Empty())
+       assert.False(t, r.HasKind(metaschema.KindStream))
 }
 
 func TestNodeRepoRegistry_ResourceRevisionRoutesByKind(t *testing.T) {
-       measureRepo := newFakeRevRepo(10)
+       measureRepo := newFakeRevRepo()
        measureRepo.put("g1", "m1", 9)
-       streamRepo := newFakeRevRepo(7)
+       streamRepo := newFakeRevRepo()
        streamRepo.put("g1", "s1", 5)
 
        r := NewNodeRepoRegistry()
@@ -116,9 +102,9 @@ func TestNodeRepoRegistry_ResourceRevisionRoutesByKind(t 
*testing.T) {
 }
 
 func TestNodeRepoRegistry_GroupKindFanOut(t *testing.T) {
-       measureRepo := newFakeRevRepo(10)
+       measureRepo := newFakeRevRepo()
        measureRepo.put("measure-only", "measure-only", 8)
-       streamRepo := newFakeRevRepo(7)
+       streamRepo := newFakeRevRepo()
        streamRepo.put("stream-only", "stream-only", 6)
 
        r := NewNodeRepoRegistry()
@@ -137,13 +123,13 @@ func TestNodeRepoRegistry_GroupKindFanOut(t *testing.T) {
 }
 
 func TestNodeRepoRegistry_DuplicateRegistrationIsIdempotent(t *testing.T) {
-       repo := newFakeRevRepo(4)
+       repo := newFakeRevRepo()
        r := NewNodeRepoRegistry()
        r.Register(metaschema.KindStream, repo)
        r.Register(metaschema.KindStream, repo)
        r.Register(metaschema.KindStream|metaschema.KindIndexRule, repo)
 
-       assert.Equal(t, int64(4), r.LatestModRevision())
+       assert.False(t, r.Empty())
        assert.True(t, r.HasKind(metaschema.KindStream))
        assert.True(t, r.HasKind(metaschema.KindIndexRule))
        assert.False(t, r.HasKind(metaschema.KindMeasure))
@@ -151,7 +137,7 @@ func 
TestNodeRepoRegistry_DuplicateRegistrationIsIdempotent(t *testing.T) {
 
 func TestNodeRepoRegistry_HasKind(t *testing.T) {
        r := NewNodeRepoRegistry()
-       
r.Register(metaschema.KindMeasure|metaschema.KindIndexRule|metaschema.KindIndexRuleBinding|metaschema.KindGroup,
 newFakeRevRepo(1))
+       
r.Register(metaschema.KindMeasure|metaschema.KindIndexRule|metaschema.KindIndexRuleBinding|metaschema.KindGroup,
 newFakeRevRepo())
 
        assert.True(t, r.HasKind(metaschema.KindMeasure))
        assert.True(t, r.HasKind(metaschema.KindIndexRule))
@@ -163,7 +149,7 @@ func TestNodeRepoRegistry_HasKind(t *testing.T) {
 
 func TestNodeRepoRegistry_ConcurrentRegisterAndLookup(t *testing.T) {
        r := NewNodeRepoRegistry()
-       repos := []*fakeRevRepo{newFakeRevRepo(1), newFakeRevRepo(2), 
newFakeRevRepo(3), newFakeRevRepo(4)}
+       repos := []*fakeRevRepo{newFakeRevRepo(), newFakeRevRepo(), 
newFakeRevRepo(), newFakeRevRepo()}
        for _, repo := range repos {
                repo.put("g", "n", 10)
        }
@@ -184,7 +170,6 @@ func TestNodeRepoRegistry_ConcurrentRegisterAndLookup(t 
*testing.T) {
        done := make(chan struct{})
        go func() {
                for range 1000 {
-                       _ = r.LatestModRevision()
                        _, _ = r.ResourceRevision(metaschema.KindStream, "g", 
"n")
                        _ = r.HasKind(metaschema.KindMeasure)
                }
@@ -194,7 +179,7 @@ func TestNodeRepoRegistry_ConcurrentRegisterAndLookup(t 
*testing.T) {
        wg.Wait()
        <-done
 
-       assert.Equal(t, int64(1), r.LatestModRevision())
+       assert.False(t, r.Empty())
        assert.True(t, r.HasKind(metaschema.KindStream))
        assert.True(t, r.HasKind(metaschema.KindMeasure))
 }
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
index 13625a7c0..da301a039 100644
--- a/pkg/schema/schema.go
+++ b/pkg/schema/schema.go
@@ -114,11 +114,10 @@ type Repository interface {
        DropGroup(groupName string) error
 }
 
-// RevisionRepository extends Repository with revision-query methods used by
-// the write/query gates and the barrier implementation.
+// RevisionRepository extends Repository with per-key revision-query methods
+// used by the write/query gates and the barrier implementation.
 type RevisionRepository interface {
        Repository
-       LatestModRevision() int64
        ResourceRevision(kind schema.Kind, group, name string) (int64, bool)
        IsAbsent(kind schema.Kind, group, name string) bool
 }
diff --git a/test/cases/schema/barrier_cluster.go 
b/test/cases/schema/barrier_cluster.go
index e76669133..b7eb0edd2 100644
--- a/test/cases/schema/barrier_cluster.go
+++ b/test/cases/schema/barrier_cluster.go
@@ -37,20 +37,14 @@ import (
 // end-to-end through the public AwaitX RPCs. They pause the receiving
 // liaison's own SchemaRegistry; the cluster barrier's selfName probe reads
 // through that SR, so pausing it surfaces a laggard via the public AwaitX
-// API without needing NodeSchemaStatusService exposed on data-node ports
-// (which the in-process distributed harness does not currently provide;
-// the cross-version Unimplemented→ready policy in the cluster fan-out
-// would mask paused data nodes from the barrier's perspective).
+// API. Data-node fan-out via NodeSchemaStatusService is covered by the
+// unit tests in banyand/liaison/grpc/barrier_cluster_test.go (§FA-1..FD-2).
+// These integration specs cover the orthogonal contract: the pause
+// primitive's effect is observable through the public AwaitX RPC and
+// resume drains the queued events so the barrier converges.
 //
-// The role-prefix attribution (`liaison-...` vs `data-...`) the plan
-// describes for §6.12 is already pinned by the unit tests in
-// banyand/liaison/grpc/barrier_cluster_test.go (§FA-1..FD-2). These
-// integration specs cover the orthogonal contract: the pause primitive's
-// effect is observable through the public AwaitX RPC and the resume
-// drains the queued events so the barrier converges.
-//
-// Specs skip themselves under standalone mode and when the liaison
-// address is empty (the standalone harness has none).
+// Specs skip themselves under standalone mode and when the receiving
+// liaison address is empty (the standalone harness has none).
 
 func barrierClusterMeasureGroup(name string) *commonv1.Group {
        return &commonv1.Group{
@@ -107,21 +101,15 @@ var _ = g.Describe("Cluster barrier under partial-cluster 
conditions (§6.12)",
                _ = setup.ResumeDataNodeWatch(paused)
        })
 
-       // §6.12a — AwaitRevisionApplied surfaces a paused liaison as a laggard
-       // via its selfName probe; resume drains the queue and the barrier
-       // converges. Uses Measure.Update for the post-pause bump because the
-       // Group watch path in this in-process harness occasionally completes
-       // before the gate sees it (the property-store reconcile cycle on
-       // Group writes can short-circuit the watch fan-out); §6.12b/c
-       // Measure flows are reliable.
-       // PENDING: queue drains successfully on resume (verified via the
-       // `queued: N` log line) but the schemaCache.notifiedModRevision
-       // watermark does not always reach the newRev target within 10s when
-       // the test re-issues AwaitRevisionApplied. The Measure-based
-       // per-key barrier (§6.12b/c) does converge, so this pending status
-       // scopes the gap to the global MaxRevision check; investigation is
-       // deferred to the same follow-up that authors data-node
-       // NodeSchemaStatusService exposure.
+       // §6.12a — AwaitRevisionApplied surfaces a paused liaison as a
+       // laggard via its selfName probe; resume drains the queue and the
+       // barrier converges. PENDING: the laggard-detection assertion passes
+       // but the post-resume AwaitRevisionApplied(newRev) does not converge
+       // inside the spec timeout. The per-key §6.12b/c flows (AwaitApplied /
+       // AwaitDeleted) do converge, so this gap is scoped to the global
+       // notifiedModRevision watermark advancing through queue replay under
+       // the in-process distributed harness — independent of the
+       // data-node NodeSchemaStatusService exposure.
        g.PIt("§6.12a AwaitRevisionApplied reports the paused liaison as a 
laggard", func() {
                groupName := fmt.Sprintf("bc-rev-%d", time.Now().UnixNano())
                measureName := "bc_rev_measure"
@@ -152,10 +140,8 @@ var _ = g.Describe("Cluster barrier under partial-cluster 
conditions (§6.12)",
 
                g.By("Calling AwaitRevisionApplied — paused liaison must 
surface as a laggard")
                // Brief settle so the bumped revision's watch event has time to
-               // reach the liaison's SR (which queues it under pause). Without
-               // this, the test races the watch stream and the queue can be
-               // empty at resume — the propagation delay between Update RPC
-               // commit and watch broadcast varies under load.
+               // reach the paused liaison's SR (which queues it under pause).
+               // Without this, the barrier can race the watch broadcast.
                time.Sleep(200 * time.Millisecond)
                callCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
                defer cancel()
@@ -280,16 +266,13 @@ var _ = g.Describe("Cluster barrier under partial-cluster 
conditions (§6.12)",
        })
 
        // §6.12d — Cross-barrier recovery: after a multi-step pause-and-mutate
-       // sequence, resume drains the queued events in arrival order, so a
+       // sequence, resume drains the queued events in arrival order so a
        // follow-up AwaitRevisionApplied at the post-mutate revision returns
-       // applied=true. This pins the queue-drain contract end-to-end.
-       // PENDING: same harness limitation as §6.12a — the post-resume
-       // AwaitRevisionApplied does not always reach the queued finalRev
-       // inside the spec timeout, even though the queue drain log shows
-       // the events were replayed. Will pass once the data-node
-       // NodeSchemaStatusService exposure work lands and the cluster
-       // barrier observes a fan-out across all members instead of the
-       // liaison's selfName probe alone.
+       // applied=true with no laggards. PENDING for the same reason as
+       // §6.12a: the global AwaitRevisionApplied watermark does not converge
+       // through queue replay inside the spec timeout. §6.12b/c remain the
+       // authoritative end-to-end coverage of the queue-drain contract via
+       // per-key barriers.
        g.PIt("§6.12d cross-barrier recovery: resume drains queued events and 
clears the laggard", func() {
                groupName := fmt.Sprintf("bc-recovery-%d", 
time.Now().UnixNano())
                measureName := "bc_recovery_measure"


Reply via email to