This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e4b538efded29a06064de0f76462fc9390861d06
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 6 15:33:07 2026 +0000

    feat(barrier): expose NodeRepoRegistry on metadata.Service and register 
per-service schemaRepos (Phase 2 Step 2.5 §1)
    
    metadata.Service grows a NodeRepoRegistry() accessor; clientService
    constructs the registry once in NewClient and returns the same instance
    for the process lifetime.
    
    Each per-service schemaRepo (measure / stream / trace) gains a
    registerWithNodeRepo helper called from every constructor (standalone /
    data / liaison) that joins the repo to the per-node aggregator under a
    kind bitmask covering Group + the catalog's primary kind +
    IndexRule + IndexRuleBinding. TopNAggregation and Property remain on the
    property schemaCache path because schemaRepo does not track those kinds.
    
    This is the load-bearing prerequisite for the Step 2.5 single-cache
    invariant: from this point on, the cluster barrier and node-status RPC
    can route lookups for executor-tracked kinds to the same caches the
    data-node executor consults during query plan execution.
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 banyand/measure/metadata.go  | 23 +++++++++++++++++++++++
 banyand/measure/svc_data.go  |  1 +
 banyand/metadata/client.go   |  9 ++++++++-
 banyand/metadata/metadata.go |  8 ++++++++
 banyand/stream/metadata.go   | 20 ++++++++++++++++++++
 banyand/trace/metadata.go    | 19 +++++++++++++++++++
 6 files changed, 79 insertions(+), 1 deletion(-)

diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 8482eed9d..91fa33db8 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -47,9 +47,17 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
+       "github.com/apache/skywalking-banyandb/pkg/schema/registry"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+// measureRegistryKinds is the kind set the measure schemaRepo registers with
+// the per-node NodeRepoRegistry. TopNAggregation is intentionally excluded —
+// schemaRepo does not store TopN entries, so the cluster barrier reads TopN
+// keys via the property schemaCache instead. Property kinds are similarly
+// out-of-scope for the per-service repos.
+const measureRegistryKinds = schema.KindGroup | schema.KindMeasure | 
schema.KindIndexRule | schema.KindIndexRuleBinding
+
 const (
        // TopNSchemaName is the name of the top n result schema.
        TopNSchemaName = "_top_n_result"
@@ -112,6 +120,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels 
map[string]string, n
                resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
        )
        sr.start()
+       sr.registerWithNodeRepo()
        return sr
 }
 
@@ -134,6 +143,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
measureDataNodeRegistry grp
                resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
        )
        sr.start()
+       sr.registerWithNodeRepo()
        return sr
 }
 
@@ -169,6 +179,19 @@ func (sr *schemaRepo) start() {
                        sr)
 }
 
+// registerWithNodeRepo joins this schemaRepo to the per-node aggregator so the
+// cluster barrier and NodeSchemaStatusService route Group/Measure/IndexRule/
+// IndexRuleBinding lookups through the same cache the executor consults via
+// LoadGroup / LoadResource. Idempotent and safe to call from every measure
+// constructor (standalone / data / liaison).
+func (sr *schemaRepo) registerWithNodeRepo() {
+       metaSvc, ok := sr.metadata.(metadata.Service)
+       if !ok {
+               return
+       }
+       registry.MaybeRegister(metaSvc.NodeRepoRegistry(), 
measureRegistryKinds, sr.Repository)
+}
+
 func (sr *schemaRepo) Measure(metadata *commonv1.Metadata) (Measure, error) {
        sm, ok := sr.loadMeasure(metadata)
        if !ok {
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index e4f776f9b..e0e90953b 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -447,6 +447,7 @@ func newDataSchemaRepo(path string, svc *dataSVC, 
nodeLabels map[string]string,
                resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
        )
        sr.start()
+       sr.registerWithNodeRepo()
        return sr
 }
 
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index fcd72a79a..b9ac35c1a 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -41,6 +41,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        banyandbpath "github.com/apache/skywalking-banyandb/pkg/path"
        "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/schema/registry"
 )
 
 const (
@@ -69,7 +70,8 @@ const (
 // NewClient returns a new metadata client.
 func NewClient() (Service, error) {
        return &clientService{
-               closer: run.NewCloser(1),
+               closer:           run.NewCloser(1),
+               nodeRepoRegistry: registry.NewNodeRepoRegistry(),
        }, nil
 }
 
@@ -80,6 +82,7 @@ type clientService struct {
        dataBroadcaster                   bus.Broadcaster
        liaisonBroadcaster                bus.Broadcaster
        infoCollectorRegistry             *schema.InfoCollectorRegistry
+       nodeRepoRegistry                  *registry.NodeRepoRegistry
        closer                            *run.Closer
        nodeDiscoveryMode                 string
        schemaRegistryMode                string
@@ -107,6 +110,10 @@ func (s *clientService) SchemaRegistry() schema.Registry {
        return s.schemaRegistry
 }
 
+func (s *clientService) NodeRepoRegistry() *registry.NodeRepoRegistry {
+       return s.nodeRepoRegistry
+}
+
 func (s *clientService) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("metadata")
        fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 
2*time.Minute, "The timeout for the node registry")
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 33865e820..adaf71ee5 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -27,6 +27,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/schema/registry"
 )
 
 // IndexFilter provides methods to find a specific index related objects and 
vice versa.
@@ -64,6 +65,13 @@ type Service interface {
        run.Service
        run.Config
        SchemaRegistry() schema.Registry
+       // NodeRepoRegistry returns the per-node aggregator the cluster barrier
+       // and NodeSchemaStatusService consult so they read from the same caches
+       // the per-service query executor resolves through. Each banyand service
+       // (measure / stream / trace / property) registers its schemaRepo here
+       // during PreRun. Returns the same instance for the lifetime of this
+       // Service.
+       NodeRepoRegistry() *registry.NodeRepoRegistry
        SetMetricsRegistry(omr observability.MetricsRegistry)
        SetDataBroadcaster(broadcaster bus.Broadcaster)
        SetLiaisonBroadcaster(broadcaster bus.Broadcaster)
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 0ecc8074c..1cdb02945 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -41,9 +41,15 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
+       "github.com/apache/skywalking-banyandb/pkg/schema/registry"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+// streamRegistryKinds is the kind set the stream schemaRepo registers with
+// the per-node NodeRepoRegistry; mirrors the stream variant in
+// banyand/measure/metadata.go.
+const streamRegistryKinds = schema.KindGroup | schema.KindStream | 
schema.KindIndexRule | schema.KindIndexRuleBinding
+
 var metadataScope = streamScope.SubScope("metadata")
 
 // SchemaService allows querying schema information.
@@ -77,6 +83,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels 
map[string]string, n
                ),
        }
        sr.start()
+       sr.registerWithNodeRepo()
        return sr
 }
 
@@ -96,6 +103,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
streamDataNodeRegistry grpc
                ),
        }
        sr.start()
+       sr.registerWithNodeRepo()
        return sr
 }
 
@@ -106,6 +114,18 @@ func (sr *schemaRepo) start() {
                        sr)
 }
 
+// registerWithNodeRepo joins this schemaRepo to the per-node aggregator so the
+// cluster barrier and NodeSchemaStatusService route Group/Stream/IndexRule/
+// IndexRuleBinding lookups through the same cache the executor consults via
+// LoadGroup / LoadResource.
+func (sr *schemaRepo) registerWithNodeRepo() {
+       metaSvc, ok := sr.metadata.(metadata.Service)
+       if !ok {
+               return
+       }
+       registry.MaybeRegister(metaSvc.NodeRepoRegistry(), streamRegistryKinds, 
sr.Repository)
+}
+
 func (sr *schemaRepo) Stream(metadata *commonv1.Metadata) (Stream, error) {
        sm, ok := sr.loadStream(metadata)
        if !ok {
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 6adee2a62..da9789a37 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -41,9 +41,14 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
+       "github.com/apache/skywalking-banyandb/pkg/schema/registry"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+// traceRegistryKinds is the kind set the trace schemaRepo registers with the
+// per-node NodeRepoRegistry.
+const traceRegistryKinds = schema.KindGroup | schema.KindTrace | 
schema.KindIndexRule | schema.KindIndexRuleBinding
+
 var (
        metadataScope = traceScope.SubScope("metadata")
        // ErrTraceNotFound is returned when a trace is not found.
@@ -81,6 +86,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels 
map[string]string, n
                ),
        }
        sr.start()
+       sr.registerWithNodeRepo()
        return sr
 }
 
@@ -101,6 +107,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
traceDataNodeRegistry grpc.
                sr.onGroupDelete = svc.handoffCtrl.deletePartsByGroup
        }
        sr.start()
+       sr.registerWithNodeRepo()
        return sr
 }
 
@@ -111,6 +118,18 @@ func (sr *schemaRepo) start() {
                        sr)
 }
 
+// registerWithNodeRepo joins this schemaRepo to the per-node aggregator so the
+// cluster barrier and NodeSchemaStatusService route Group/Trace/IndexRule/
+// IndexRuleBinding lookups through the same cache the executor consults via
+// LoadGroup / LoadResource.
+func (sr *schemaRepo) registerWithNodeRepo() {
+       metaSvc, ok := sr.metadata.(metadata.Service)
+       if !ok {
+               return
+       }
+       registry.MaybeRegister(metaSvc.NodeRepoRegistry(), traceRegistryKinds, 
sr.Repository)
+}
+
 func (sr *schemaRepo) Trace(metadata *commonv1.Metadata) (*trace, bool) {
        sm, ok := sr.Repository.LoadResource(metadata)
        if !ok {

Reply via email to