This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 57eb252f92c7c03521ad079c174a69e72f557542
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 6 15:32:22 2026 +0000

    feat(schema/registry): add NodeRepoRegistry per-node aggregator (Phase 2 
Step 2.5 §1)
    
    Introduces pkg/schema/registry, the per-node aggregator that the cluster
    barrier (NodeSchemaStatusService) and write/query gates consult. Each
    banyand service (measure / stream / trace / property) registers its
    pkg/schema.schemaRepo here during PreRun under a kind bitmask; lookups
    route to the same caches the data-node executor consults via
    LoadGroup / LoadResource.
    
    The package lives outside pkg/schema to avoid an import cycle through
    banyand/metadata when metadata.Service exposes the registry. A compile-
    time assertion in pkg/schema/init.go pins the schemaRepo →
    RevisionRepository contract so future API drift breaks the build before
    the cluster barrier silently degrades.
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 pkg/schema/init.go                   |   7 ++
 pkg/schema/registry/registry.go      | 178 +++++++++++++++++++++++++++++++
 pkg/schema/registry/registry_test.go | 200 +++++++++++++++++++++++++++++++++++
 3 files changed, 385 insertions(+)

diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index 3f96d750b..3c20b32c9 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -25,8 +25,15 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/schema/registry"
 )
 
+// schemaRepo satisfies the registry.RevisionRepository contract via its
+// LatestModRevision / ResourceRevision / IsAbsent methods. The compile-time
+// assertion lives here (next to RevisionRepository) so a future API drift
+// breaks the build before the cluster barrier silently degrades.
+var _ registry.RevisionRepository = (*schemaRepo)(nil)
+
 var initTimeout = 10 * time.Second
 
 type revisionContext struct {
diff --git a/pkg/schema/registry/registry.go b/pkg/schema/registry/registry.go
new file mode 100644
index 000000000..d69883829
--- /dev/null
+++ b/pkg/schema/registry/registry.go
@@ -0,0 +1,178 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package registry holds the per-node aggregator that routes barrier and
+// node-status RPC lookups to the same per-service schema-repo caches the
+// query executor consults. Living in its own package lets banyand/metadata
+// expose the registry via metadata.Service without importing pkg/schema,
+// which would create a cycle.
+package registry
+
+import (
+       "slices"
+       "sync"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+// RevisionRepository is the read surface a per-service schema repo exposes to
+// the registry. The schemaRepo type in pkg/schema implements this interface;
+// keeping the interface here (alongside the registry) instead of importing
+// pkg/schema avoids a dependency cycle through banyand/metadata.
+type RevisionRepository interface {
+       LatestModRevision() int64
+       ResourceRevision(kind schema.Kind, group, name string) (int64, bool)
+       IsAbsent(kind schema.Kind, group, name string) bool
+}
+
+// NodeRepoRegistry aggregates per-service RevisionRepository instances on a
+// single node (liaison or data). Each banyand service (measure, stream, trace,
+// …) registers its schemaRepo during PreRun with a kind bitmask describing the
+// kinds the repo tracks. The registry routes barrier and node-status lookups 
to
+// the same caches the executor consults during query plan execution, so a
+// positive answer from LatestModRevision implies every registered repo has
+// applied the revision and any executor cache the node exposes is at least at
+// that point. This is the load-bearing prerequisite for the Phase 2
+// single-cache invariant — without it the cluster barrier would certify a
+// cache the data-node executor never reads.
+//
+// Safe for concurrent registration during PreRun and concurrent lookup.
+type NodeRepoRegistry struct {
+       byKind map[schema.Kind][]RevisionRepository
+       seen   map[RevisionRepository]struct{}
+       repos  []RevisionRepository
+       mu     sync.RWMutex
+}
+
+// NewNodeRepoRegistry returns an empty registry ready for service 
registration.
+func NewNodeRepoRegistry() *NodeRepoRegistry {
+       return &NodeRepoRegistry{
+               byKind: make(map[schema.Kind][]RevisionRepository),
+               seen:   make(map[RevisionRepository]struct{}),
+       }
+}
+
+// Register associates a RevisionRepository with one or more kinds. The kinds
+// argument is a bitmask of schema.Kind values; the registry walks each set
+// bit and indexes the repo against that kind. Re-registering the same
+// (kind, repo) pair is a no-op so PreRun is idempotent. A nil repo or empty
+// kind mask is silently dropped.
+func (r *NodeRepoRegistry) Register(kinds schema.Kind, repo 
RevisionRepository) {
+       if repo == nil || kinds == 0 {
+               return
+       }
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       if _, ok := r.seen[repo]; !ok {
+               r.seen[repo] = struct{}{}
+               r.repos = append(r.repos, repo)
+       }
+       for _, k := range schema.AllKinds() {
+               if kinds&k == 0 {
+                       continue
+               }
+               existing := r.byKind[k]
+               if !slices.Contains(existing, repo) {
+                       r.byKind[k] = append(existing, repo)
+               }
+       }
+}
+
+// LatestModRevision returns the minimum LatestModRevision across every
+// registered repo. A registry with no repos returns 0, matching the
+// "node not yet caught up" semantic the cluster-barrier loop interprets as
+// "keep polling".
+func (r *NodeRepoRegistry) LatestModRevision() int64 {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+       if len(r.repos) == 0 {
+               return 0
+       }
+       minRev := r.repos[0].LatestModRevision()
+       for _, repo := range r.repos[1:] {
+               if rev := repo.LatestModRevision(); rev < minRev {
+                       minRev = rev
+               }
+       }
+       return minRev
+}
+
+// ResourceRevision routes the lookup to repos registered against the given
+// kind and returns the first match. Returns (0, false) when no repo is
+// registered for the kind or no registered repo holds the resource — the same
+// shape the per-service schemaRepo.ResourceRevision returns for an unknown
+// key, so the barrier and node-status RPC can fall through unchanged.
+func (r *NodeRepoRegistry) ResourceRevision(kind schema.Kind, group, name 
string) (int64, bool) {
+       r.mu.RLock()
+       repos := r.byKind[kind]
+       r.mu.RUnlock()
+       for _, repo := range repos {
+               if rev, ok := repo.ResourceRevision(kind, group, name); ok {
+                       return rev, true
+               }
+       }
+       return 0, false
+}
+
+// IsAbsent returns true when no repo registered for the kind holds the given
+// (group, name). A kind with no registered repo is reported absent — the
+// AwaitSchemaDeleted barrier reads this as "the deletion has already been
+// observed" only if the kind genuinely has no owner on this node. Wire
+// TopN/Property kinds through the schemaCache path; do not infer absence
+// from this registry for those.
+func (r *NodeRepoRegistry) IsAbsent(kind schema.Kind, group, name string) bool 
{
+       _, ok := r.ResourceRevision(kind, group, name)
+       return !ok
+}
+
+// HasKind reports whether at least one repo is registered for the given kind.
+// The node-status server uses this to route per-key lookups: kinds with a
+// registered repo go through the registry; kinds without (TopN, Property)
+// fall through to the property schemaCache.
+func (r *NodeRepoRegistry) HasKind(kind schema.Kind) bool {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+       return len(r.byKind[kind]) > 0
+}
+
+// Empty reports whether any repo has been registered. The node-status server
+// uses this to skip the registry's min-rev contribution on a node where no
+// per-service schemaRepo runs (e.g. a property-only metadata host) — without
+// the check, an empty registry's LatestModRevision()=0 would gate every
+// barrier verdict to 0.
+func (r *NodeRepoRegistry) Empty() bool {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+       return len(r.repos) == 0
+}
+
+// MaybeRegister registers repo with reg under the given kinds when reg is
+// non-nil and repo satisfies RevisionRepository. Service constructors hold the
+// schemaRepo as a Repository interface (pkg/schema.Repository); the concrete
+// value is *pkg/schema.schemaRepo, which implements RevisionRepository — so a
+// type assertion at registration time is safe in production and fails closed
+// for in-memory test fakes that only implement the base Repository surface.
+func MaybeRegister(reg *NodeRepoRegistry, kinds schema.Kind, repo any) {
+       if reg == nil {
+               return
+       }
+       rr, ok := repo.(RevisionRepository)
+       if !ok {
+               return
+       }
+       reg.Register(kinds, rr)
+}
diff --git a/pkg/schema/registry/registry_test.go 
b/pkg/schema/registry/registry_test.go
new file mode 100644
index 000000000..53a6d50f8
--- /dev/null
+++ b/pkg/schema/registry/registry_test.go
@@ -0,0 +1,200 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package registry
+
+import (
+       "sync"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       metaschema 
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+// fakeRevRepo is a minimal RevisionRepository for registry unit tests.
+type fakeRevRepo struct {
+       resources map[string]map[string]int64
+       latest    int64
+}
+
+func newFakeRevRepo(latest int64) *fakeRevRepo {
+       return &fakeRevRepo{
+               latest:    latest,
+               resources: make(map[string]map[string]int64),
+       }
+}
+
+func (f *fakeRevRepo) put(group, name string, rev int64) {
+       if f.resources[group] == nil {
+               f.resources[group] = make(map[string]int64)
+       }
+       f.resources[group][name] = rev
+}
+
+func (f *fakeRevRepo) LatestModRevision() int64 {
+       return f.latest
+}
+
+func (f *fakeRevRepo) ResourceRevision(_ metaschema.Kind, group, name string) 
(int64, bool) {
+       if g := f.resources[group]; g != nil {
+               if rev, ok := g[name]; ok {
+                       return rev, true
+               }
+       }
+       return 0, false
+}
+
+func (f *fakeRevRepo) IsAbsent(kind metaschema.Kind, group, name string) bool {
+       _, ok := f.ResourceRevision(kind, group, name)
+       return !ok
+}
+
+func TestNodeRepoRegistry_EmptyReturnsZero(t *testing.T) {
+       r := NewNodeRepoRegistry()
+       assert.Equal(t, int64(0), r.LatestModRevision())
+       rev, ok := r.ResourceRevision(metaschema.KindStream, "g", "n")
+       assert.Equal(t, int64(0), rev)
+       assert.False(t, ok)
+       assert.True(t, r.IsAbsent(metaschema.KindStream, "g", "n"))
+       assert.False(t, r.HasKind(metaschema.KindStream))
+}
+
+func TestNodeRepoRegistry_NilRepoOrZeroKindIsNoop(t *testing.T) {
+       r := NewNodeRepoRegistry()
+       r.Register(metaschema.KindStream, nil)
+       r.Register(0, newFakeRevRepo(5))
+       assert.Equal(t, int64(0), r.LatestModRevision())
+}
+
+func TestNodeRepoRegistry_LatestModRevisionTakesMin(t *testing.T) {
+       r := NewNodeRepoRegistry()
+       r.Register(metaschema.KindMeasure|metaschema.KindIndexRule, 
newFakeRevRepo(10))
+       r.Register(metaschema.KindStream, newFakeRevRepo(7))
+       r.Register(metaschema.KindTrace, newFakeRevRepo(15))
+
+       assert.Equal(t, int64(7), r.LatestModRevision(), "min over registered 
repos wins")
+}
+
+func TestNodeRepoRegistry_ResourceRevisionRoutesByKind(t *testing.T) {
+       measureRepo := newFakeRevRepo(10)
+       measureRepo.put("g1", "m1", 9)
+       streamRepo := newFakeRevRepo(7)
+       streamRepo.put("g1", "s1", 5)
+
+       r := NewNodeRepoRegistry()
+       r.Register(metaschema.KindMeasure, measureRepo)
+       r.Register(metaschema.KindStream, streamRepo)
+
+       rev, ok := r.ResourceRevision(metaschema.KindMeasure, "g1", "m1")
+       assert.True(t, ok)
+       assert.Equal(t, int64(9), rev)
+
+       rev, ok = r.ResourceRevision(metaschema.KindStream, "g1", "s1")
+       assert.True(t, ok)
+       assert.Equal(t, int64(5), rev)
+
+       // Wrong-kind lookup returns absent even though the name matches a
+       // resource on a sibling repo — barrier semantics: a Stream key never
+       // satisfies via the measure repo.
+       _, ok = r.ResourceRevision(metaschema.KindStream, "g1", "m1")
+       assert.False(t, ok)
+}
+
+func TestNodeRepoRegistry_GroupKindFanOut(t *testing.T) {
+       measureRepo := newFakeRevRepo(10)
+       measureRepo.put("measure-only", "measure-only", 8)
+       streamRepo := newFakeRevRepo(7)
+       streamRepo.put("stream-only", "stream-only", 6)
+
+       r := NewNodeRepoRegistry()
+       r.Register(metaschema.KindGroup|metaschema.KindMeasure, measureRepo)
+       r.Register(metaschema.KindGroup|metaschema.KindStream, streamRepo)
+
+       rev, ok := r.ResourceRevision(metaschema.KindGroup, "stream-only", 
"stream-only")
+       assert.True(t, ok)
+       assert.Equal(t, int64(6), rev)
+
+       rev, ok = r.ResourceRevision(metaschema.KindGroup, "measure-only", 
"measure-only")
+       assert.True(t, ok)
+       assert.Equal(t, int64(8), rev)
+
+       assert.True(t, r.IsAbsent(metaschema.KindGroup, "ghost", "ghost"))
+}
+
+func TestNodeRepoRegistry_DuplicateRegistrationIsIdempotent(t *testing.T) {
+       repo := newFakeRevRepo(4)
+       r := NewNodeRepoRegistry()
+       r.Register(metaschema.KindStream, repo)
+       r.Register(metaschema.KindStream, repo)
+       r.Register(metaschema.KindStream|metaschema.KindIndexRule, repo)
+
+       assert.Equal(t, int64(4), r.LatestModRevision())
+       assert.True(t, r.HasKind(metaschema.KindStream))
+       assert.True(t, r.HasKind(metaschema.KindIndexRule))
+       assert.False(t, r.HasKind(metaschema.KindMeasure))
+}
+
+func TestNodeRepoRegistry_HasKind(t *testing.T) {
+       r := NewNodeRepoRegistry()
+       
r.Register(metaschema.KindMeasure|metaschema.KindIndexRule|metaschema.KindIndexRuleBinding|metaschema.KindGroup,
 newFakeRevRepo(1))
+
+       assert.True(t, r.HasKind(metaschema.KindMeasure))
+       assert.True(t, r.HasKind(metaschema.KindIndexRule))
+       assert.True(t, r.HasKind(metaschema.KindIndexRuleBinding))
+       assert.True(t, r.HasKind(metaschema.KindGroup))
+       assert.False(t, r.HasKind(metaschema.KindTopNAggregation), "TopN routes 
via schemaCache, not the registry")
+       assert.False(t, r.HasKind(metaschema.KindProperty), "Property routes 
via schemaCache, not the registry")
+}
+
+func TestNodeRepoRegistry_ConcurrentRegisterAndLookup(t *testing.T) {
+       r := NewNodeRepoRegistry()
+       repos := []*fakeRevRepo{newFakeRevRepo(1), newFakeRevRepo(2), 
newFakeRevRepo(3), newFakeRevRepo(4)}
+       for _, repo := range repos {
+               repo.put("g", "n", 10)
+       }
+
+       var wg sync.WaitGroup
+       wg.Add(len(repos))
+       for i, repo := range repos {
+               go func(idx int, rp *fakeRevRepo) {
+                       defer wg.Done()
+                       kinds := metaschema.KindMeasure
+                       if idx%2 == 0 {
+                               kinds = metaschema.KindStream
+                       }
+                       r.Register(kinds, rp)
+               }(i, repo)
+       }
+
+       done := make(chan struct{})
+       go func() {
+               for range 1000 {
+                       _ = r.LatestModRevision()
+                       _, _ = r.ResourceRevision(metaschema.KindStream, "g", 
"n")
+                       _ = r.HasKind(metaschema.KindMeasure)
+               }
+               close(done)
+       }()
+
+       wg.Wait()
+       <-done
+
+       assert.Equal(t, int64(1), r.LatestModRevision())
+       assert.True(t, r.HasKind(metaschema.KindStream))
+       assert.True(t, r.HasKind(metaschema.KindMeasure))
+}

Reply via email to