This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e4b538efded29a06064de0f76462fc9390861d06 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 6 15:33:07 2026 +0000 feat(barrier): expose NodeRepoRegistry on metadata.Service and register per-service schemaRepos (Phase 2 Step 2.5 §1) metadata.Service grows a NodeRepoRegistry() accessor; clientService constructs the registry once in NewClient and returns the same instance for the process lifetime. Each per-service schemaRepo (measure / stream / trace) gains a registerWithNodeRepo helper called from every constructor (standalone / data / liaison) that joins the repo to the per-node aggregator under a kind bitmask covering Group + the catalog's primary kind + IndexRule + IndexRuleBinding. TopNAggregation and Property remain on the property schemaCache path because schemaRepo does not track those kinds. This is the load-bearing prerequisite for the Step 2.5 single-cache invariant: from this point on, the cluster barrier and node-status RPC can route lookups for executor-tracked kinds to the same caches the data-node executor consults during query plan execution. via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- banyand/measure/metadata.go | 23 +++++++++++++++++++++++ banyand/measure/svc_data.go | 1 + banyand/metadata/client.go | 9 ++++++++- banyand/metadata/metadata.go | 8 ++++++++ banyand/stream/metadata.go | 20 ++++++++++++++++++++ banyand/trace/metadata.go | 19 +++++++++++++++++++ 6 files changed, 79 insertions(+), 1 deletion(-) diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 8482eed9d..91fa33db8 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -47,9 +47,17 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/meter" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" + "github.com/apache/skywalking-banyandb/pkg/schema/registry" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) +// measureRegistryKinds is the kind set the measure schemaRepo registers with +// the per-node NodeRepoRegistry. TopNAggregation is intentionally excluded — +// schemaRepo does not store TopN entries, so the cluster barrier reads TopN +// keys via the property schemaCache instead. Property kinds are similarly +// out-of-scope for the per-service repos. +const measureRegistryKinds = schema.KindGroup | schema.KindMeasure | schema.KindIndexRule | schema.KindIndexRuleBinding + const ( // TopNSchemaName is the name of the top n result schema. TopNSchemaName = "_top_n_result" @@ -112,6 +120,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, n resourceSchema.NewMetrics(svc.omr.With(metadataScope)), ) sr.start() + sr.registerWithNodeRepo() return sr } @@ -134,6 +143,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison, measureDataNodeRegistry grp resourceSchema.NewMetrics(svc.omr.With(metadataScope)), ) sr.start() + sr.registerWithNodeRepo() return sr } @@ -169,6 +179,19 @@ func (sr *schemaRepo) start() { sr) } +// registerWithNodeRepo joins this schemaRepo to the per-node aggregator so the +// cluster barrier and NodeSchemaStatusService route Group/Measure/IndexRule/ +// IndexRuleBinding lookups through the same cache the executor consults via +// LoadGroup / LoadResource. Idempotent and safe to call from every measure +// constructor (standalone / data / liaison). +func (sr *schemaRepo) registerWithNodeRepo() { + metaSvc, ok := sr.metadata.(metadata.Service) + if !ok { + return + } + registry.MaybeRegister(metaSvc.NodeRepoRegistry(), measureRegistryKinds, sr.Repository) +} + func (sr *schemaRepo) Measure(metadata *commonv1.Metadata) (Measure, error) { sm, ok := sr.loadMeasure(metadata) if !ok { diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go index e4f776f9b..e0e90953b 100644 --- a/banyand/measure/svc_data.go +++ b/banyand/measure/svc_data.go @@ -447,6 +447,7 @@ func newDataSchemaRepo(path string, svc *dataSVC, nodeLabels map[string]string, resourceSchema.NewMetrics(svc.omr.With(metadataScope)), ) sr.start() + sr.registerWithNodeRepo() return sr } diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index fcd72a79a..b9ac35c1a 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -41,6 +41,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" banyandbpath "github.com/apache/skywalking-banyandb/pkg/path" "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/schema/registry" ) const ( @@ -69,7 +70,8 @@ const ( // NewClient returns a new metadata client. func NewClient() (Service, error) { return &clientService{ - closer: run.NewCloser(1), + closer: run.NewCloser(1), + nodeRepoRegistry: registry.NewNodeRepoRegistry(), }, nil } @@ -80,6 +82,7 @@ type clientService struct { dataBroadcaster bus.Broadcaster liaisonBroadcaster bus.Broadcaster infoCollectorRegistry *schema.InfoCollectorRegistry + nodeRepoRegistry *registry.NodeRepoRegistry closer *run.Closer nodeDiscoveryMode string schemaRegistryMode string @@ -107,6 +110,10 @@ func (s *clientService) SchemaRegistry() schema.Registry { return s.schemaRegistry } +func (s *clientService) NodeRepoRegistry() *registry.NodeRepoRegistry { + return s.nodeRepoRegistry +} + func (s *clientService) FlagSet() *run.FlagSet { fs := run.NewFlagSet("metadata") fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 2*time.Minute, "The timeout for the node registry") diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go index 33865e820..adaf71ee5 100644 --- a/banyand/metadata/metadata.go +++ b/banyand/metadata/metadata.go @@ -27,6 +27,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/schema/registry" ) // IndexFilter provides methods to find a specific index related objects and vice versa. @@ -64,6 +65,13 @@ type Service interface { run.Service run.Config SchemaRegistry() schema.Registry + // NodeRepoRegistry returns the per-node aggregator the cluster barrier + // and NodeSchemaStatusService consult so they read from the same caches + // the per-service query executor resolves through. Each banyand service + // (measure / stream / trace / property) registers its schemaRepo here + // during PreRun. Returns the same instance for the lifetime of this + // Service. + NodeRepoRegistry() *registry.NodeRepoRegistry SetMetricsRegistry(omr observability.MetricsRegistry) SetDataBroadcaster(broadcaster bus.Broadcaster) SetLiaisonBroadcaster(broadcaster bus.Broadcaster) diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 0ecc8074c..1cdb02945 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -41,9 +41,15 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/meter" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" + "github.com/apache/skywalking-banyandb/pkg/schema/registry" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) +// streamRegistryKinds is the kind set the stream schemaRepo registers with +// the per-node NodeRepoRegistry; mirrors the stream variant in +// banyand/measure/metadata.go. +const streamRegistryKinds = schema.KindGroup | schema.KindStream | schema.KindIndexRule | schema.KindIndexRuleBinding + var metadataScope = streamScope.SubScope("metadata") // SchemaService allows querying schema information. @@ -77,6 +83,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, n ), } sr.start() + sr.registerWithNodeRepo() return sr } @@ -96,6 +103,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison, streamDataNodeRegistry grpc ), } sr.start() + sr.registerWithNodeRepo() return sr } @@ -106,6 +114,18 @@ func (sr *schemaRepo) start() { sr) } +// registerWithNodeRepo joins this schemaRepo to the per-node aggregator so the +// cluster barrier and NodeSchemaStatusService route Group/Stream/IndexRule/ +// IndexRuleBinding lookups through the same cache the executor consults via +// LoadGroup / LoadResource. +func (sr *schemaRepo) registerWithNodeRepo() { + metaSvc, ok := sr.metadata.(metadata.Service) + if !ok { + return + } + registry.MaybeRegister(metaSvc.NodeRepoRegistry(), streamRegistryKinds, sr.Repository) +} + func (sr *schemaRepo) Stream(metadata *commonv1.Metadata) (Stream, error) { sm, ok := sr.loadStream(metadata) if !ok { diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go index 6adee2a62..da9789a37 100644 --- a/banyand/trace/metadata.go +++ b/banyand/trace/metadata.go @@ -41,9 +41,14 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/meter" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" + "github.com/apache/skywalking-banyandb/pkg/schema/registry" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) +// traceRegistryKinds is the kind set the trace schemaRepo registers with the +// per-node NodeRepoRegistry. +const traceRegistryKinds = schema.KindGroup | schema.KindTrace | schema.KindIndexRule | schema.KindIndexRuleBinding + var ( metadataScope = traceScope.SubScope("metadata") // ErrTraceNotFound is returned when a trace is not found. @@ -81,6 +86,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, n ), } sr.start() + sr.registerWithNodeRepo() return sr } @@ -101,6 +107,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison, traceDataNodeRegistry grpc. sr.onGroupDelete = svc.handoffCtrl.deletePartsByGroup } sr.start() + sr.registerWithNodeRepo() return sr } @@ -111,6 +118,18 @@ func (sr *schemaRepo) start() { sr) } +// registerWithNodeRepo joins this schemaRepo to the per-node aggregator so the +// cluster barrier and NodeSchemaStatusService route Group/Trace/IndexRule/ +// IndexRuleBinding lookups through the same cache the executor consults via +// LoadGroup / LoadResource. +func (sr *schemaRepo) registerWithNodeRepo() { + metaSvc, ok := sr.metadata.(metadata.Service) + if !ok { + return + } + registry.MaybeRegister(metaSvc.NodeRepoRegistry(), traceRegistryKinds, sr.Repository) +} + func (sr *schemaRepo) Trace(metadata *commonv1.Metadata) (*trace, bool) { sm, ok := sr.Repository.LoadResource(metadata) if !ok {
