This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 57eb252f92c7c03521ad079c174a69e72f557542 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 6 15:32:22 2026 +0000 feat(schema/registry): add NodeRepoRegistry per-node aggregator (Phase 2 Step 2.5 §1) Introduces pkg/schema/registry, the per-node aggregator that the cluster barrier (NodeSchemaStatusService) and write/query gates consult. Each banyand service (measure / stream / trace / property) registers its pkg/schema.schemaRepo here during PreRun under a kind bitmask; lookups route to the same caches the data-node executor consults via LoadGroup / LoadResource. The package lives outside pkg/schema to avoid an import cycle through banyand/metadata when metadata.Service exposes the registry. A compile- time assertion in pkg/schema/init.go pins the schemaRepo → RevisionRepository contract so future API drift breaks the build before the cluster barrier silently degrades. via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- pkg/schema/init.go | 7 ++ pkg/schema/registry/registry.go | 178 +++++++++++++++++++++++++++++++ pkg/schema/registry/registry_test.go | 200 +++++++++++++++++++++++++++++++++++ 3 files changed, 385 insertions(+) diff --git a/pkg/schema/init.go b/pkg/schema/init.go index 3f96d750b..3c20b32c9 100644 --- a/pkg/schema/init.go +++ b/pkg/schema/init.go @@ -25,8 +25,15 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/schema/registry" ) +// schemaRepo satisfies the registry.RevisionRepository contract via its +// LatestModRevision / ResourceRevision / IsAbsent methods. The compile-time +// assertion lives here (next to RevisionRepository) so a future API drift +// breaks the build before the cluster barrier silently degrades. +var _ registry.RevisionRepository = (*schemaRepo)(nil) + var initTimeout = 10 * time.Second type revisionContext struct { diff --git a/pkg/schema/registry/registry.go b/pkg/schema/registry/registry.go new file mode 100644 index 000000000..d69883829 --- /dev/null +++ b/pkg/schema/registry/registry.go @@ -0,0 +1,178 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package registry holds the per-node aggregator that routes barrier and +// node-status RPC lookups to the same per-service schema-repo caches the +// query executor consults. Living in its own package lets banyand/metadata +// expose the registry via metadata.Service without importing pkg/schema, +// which would create a cycle. +package registry + +import ( + "slices" + "sync" + + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" +) + +// RevisionRepository is the read surface a per-service schema repo exposes to +// the registry. The schemaRepo type in pkg/schema implements this interface; +// keeping the interface here (alongside the registry) instead of importing +// pkg/schema avoids a dependency cycle through banyand/metadata. +type RevisionRepository interface { + LatestModRevision() int64 + ResourceRevision(kind schema.Kind, group, name string) (int64, bool) + IsAbsent(kind schema.Kind, group, name string) bool +} + +// NodeRepoRegistry aggregates per-service RevisionRepository instances on a +// single node (liaison or data). Each banyand service (measure, stream, trace, +// …) registers its schemaRepo during PreRun with a kind bitmask describing the +// kinds the repo tracks. The registry routes barrier and node-status lookups to +// the same caches the executor consults during query plan execution, so a +// positive answer from LatestModRevision implies every registered repo has +// applied the revision and any executor cache the node exposes is at least at +// that point. This is the load-bearing prerequisite for the Phase 2 +// single-cache invariant — without it the cluster barrier would certify a +// cache the data-node executor never reads. +// +// Safe for concurrent registration during PreRun and concurrent lookup. +type NodeRepoRegistry struct { + byKind map[schema.Kind][]RevisionRepository + seen map[RevisionRepository]struct{} + repos []RevisionRepository + mu sync.RWMutex +} + +// NewNodeRepoRegistry returns an empty registry ready for service registration. +func NewNodeRepoRegistry() *NodeRepoRegistry { + return &NodeRepoRegistry{ + byKind: make(map[schema.Kind][]RevisionRepository), + seen: make(map[RevisionRepository]struct{}), + } +} + +// Register associates a RevisionRepository with one or more kinds. The kinds +// argument is a bitmask of schema.Kind values; the registry walks each set +// bit and indexes the repo against that kind. Re-registering the same +// (kind, repo) pair is a no-op so PreRun is idempotent. A nil repo or empty +// kind mask is silently dropped. +func (r *NodeRepoRegistry) Register(kinds schema.Kind, repo RevisionRepository) { + if repo == nil || kinds == 0 { + return + } + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.seen[repo]; !ok { + r.seen[repo] = struct{}{} + r.repos = append(r.repos, repo) + } + for _, k := range schema.AllKinds() { + if kinds&k == 0 { + continue + } + existing := r.byKind[k] + if !slices.Contains(existing, repo) { + r.byKind[k] = append(existing, repo) + } + } +} + +// LatestModRevision returns the minimum LatestModRevision across every +// registered repo. A registry with no repos returns 0, matching the +// "node not yet caught up" semantic the cluster-barrier loop interprets as +// "keep polling". +func (r *NodeRepoRegistry) LatestModRevision() int64 { + r.mu.RLock() + defer r.mu.RUnlock() + if len(r.repos) == 0 { + return 0 + } + minRev := r.repos[0].LatestModRevision() + for _, repo := range r.repos[1:] { + if rev := repo.LatestModRevision(); rev < minRev { + minRev = rev + } + } + return minRev +} + +// ResourceRevision routes the lookup to repos registered against the given +// kind and returns the first match. Returns (0, false) when no repo is +// registered for the kind or no registered repo holds the resource — the same +// shape the per-service schemaRepo.ResourceRevision returns for an unknown +// key, so the barrier and node-status RPC can fall through unchanged. +func (r *NodeRepoRegistry) ResourceRevision(kind schema.Kind, group, name string) (int64, bool) { + r.mu.RLock() + repos := r.byKind[kind] + r.mu.RUnlock() + for _, repo := range repos { + if rev, ok := repo.ResourceRevision(kind, group, name); ok { + return rev, true + } + } + return 0, false +} + +// IsAbsent returns true when no repo registered for the kind holds the given +// (group, name). A kind with no registered repo is reported absent — the +// AwaitSchemaDeleted barrier reads this as "the deletion has already been +// observed" only if the kind genuinely has no owner on this node. Wire +// TopN/Property kinds through the schemaCache path; do not infer absence +// from this registry for those. +func (r *NodeRepoRegistry) IsAbsent(kind schema.Kind, group, name string) bool { + _, ok := r.ResourceRevision(kind, group, name) + return !ok +} + +// HasKind reports whether at least one repo is registered for the given kind. +// The node-status server uses this to route per-key lookups: kinds with a +// registered repo go through the registry; kinds without (TopN, Property) +// fall through to the property schemaCache. +func (r *NodeRepoRegistry) HasKind(kind schema.Kind) bool { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.byKind[kind]) > 0 +} + +// Empty reports whether any repo has been registered. The node-status server +// uses this to skip the registry's min-rev contribution on a node where no +// per-service schemaRepo runs (e.g. a property-only metadata host) — without +// the check, an empty registry's LatestModRevision()=0 would gate every +// barrier verdict to 0. +func (r *NodeRepoRegistry) Empty() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.repos) == 0 +} + +// MaybeRegister registers repo with reg under the given kinds when reg is +// non-nil and repo satisfies RevisionRepository. Service constructors hold the +// schemaRepo as a Repository interface (pkg/schema.Repository); the concrete +// value is *pkg/schema.schemaRepo, which implements RevisionRepository — so a +// type assertion at registration time is safe in production and fails closed +// for in-memory test fakes that only implement the base Repository surface. +func MaybeRegister(reg *NodeRepoRegistry, kinds schema.Kind, repo any) { + if reg == nil { + return + } + rr, ok := repo.(RevisionRepository) + if !ok { + return + } + reg.Register(kinds, rr) +} diff --git a/pkg/schema/registry/registry_test.go b/pkg/schema/registry/registry_test.go new file mode 100644 index 000000000..53a6d50f8 --- /dev/null +++ b/pkg/schema/registry/registry_test.go @@ -0,0 +1,200 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package registry + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + metaschema "github.com/apache/skywalking-banyandb/banyand/metadata/schema" +) + +// fakeRevRepo is a minimal RevisionRepository for registry unit tests. +type fakeRevRepo struct { + resources map[string]map[string]int64 + latest int64 +} + +func newFakeRevRepo(latest int64) *fakeRevRepo { + return &fakeRevRepo{ + latest: latest, + resources: make(map[string]map[string]int64), + } +} + +func (f *fakeRevRepo) put(group, name string, rev int64) { + if f.resources[group] == nil { + f.resources[group] = make(map[string]int64) + } + f.resources[group][name] = rev +} + +func (f *fakeRevRepo) LatestModRevision() int64 { + return f.latest +} + +func (f *fakeRevRepo) ResourceRevision(_ metaschema.Kind, group, name string) (int64, bool) { + if g := f.resources[group]; g != nil { + if rev, ok := g[name]; ok { + return rev, true + } + } + return 0, false +} + +func (f *fakeRevRepo) IsAbsent(kind metaschema.Kind, group, name string) bool { + _, ok := f.ResourceRevision(kind, group, name) + return !ok +} + +func TestNodeRepoRegistry_EmptyReturnsZero(t *testing.T) { + r := NewNodeRepoRegistry() + assert.Equal(t, int64(0), r.LatestModRevision()) + rev, ok := r.ResourceRevision(metaschema.KindStream, "g", "n") + assert.Equal(t, int64(0), rev) + assert.False(t, ok) + assert.True(t, r.IsAbsent(metaschema.KindStream, "g", "n")) + assert.False(t, r.HasKind(metaschema.KindStream)) +} + +func TestNodeRepoRegistry_NilRepoOrZeroKindIsNoop(t *testing.T) { + r := NewNodeRepoRegistry() + r.Register(metaschema.KindStream, nil) + r.Register(0, newFakeRevRepo(5)) + assert.Equal(t, int64(0), r.LatestModRevision()) +} + +func TestNodeRepoRegistry_LatestModRevisionTakesMin(t *testing.T) { + r := NewNodeRepoRegistry() + r.Register(metaschema.KindMeasure|metaschema.KindIndexRule, newFakeRevRepo(10)) + r.Register(metaschema.KindStream, newFakeRevRepo(7)) + r.Register(metaschema.KindTrace, newFakeRevRepo(15)) + + assert.Equal(t, int64(7), r.LatestModRevision(), "min over registered repos wins") +} + +func TestNodeRepoRegistry_ResourceRevisionRoutesByKind(t *testing.T) { + measureRepo := newFakeRevRepo(10) + measureRepo.put("g1", "m1", 9) + streamRepo := newFakeRevRepo(7) + streamRepo.put("g1", "s1", 5) + + r := NewNodeRepoRegistry() + r.Register(metaschema.KindMeasure, measureRepo) + r.Register(metaschema.KindStream, streamRepo) + + rev, ok := r.ResourceRevision(metaschema.KindMeasure, "g1", "m1") + assert.True(t, ok) + assert.Equal(t, int64(9), rev) + + rev, ok = r.ResourceRevision(metaschema.KindStream, "g1", "s1") + assert.True(t, ok) + assert.Equal(t, int64(5), rev) + + // Wrong-kind lookup returns absent even though the name matches a + // resource on a sibling repo — barrier semantics: a Stream key never + // satisfies via the measure repo. + _, ok = r.ResourceRevision(metaschema.KindStream, "g1", "m1") + assert.False(t, ok) +} + +func TestNodeRepoRegistry_GroupKindFanOut(t *testing.T) { + measureRepo := newFakeRevRepo(10) + measureRepo.put("measure-only", "measure-only", 8) + streamRepo := newFakeRevRepo(7) + streamRepo.put("stream-only", "stream-only", 6) + + r := NewNodeRepoRegistry() + r.Register(metaschema.KindGroup|metaschema.KindMeasure, measureRepo) + r.Register(metaschema.KindGroup|metaschema.KindStream, streamRepo) + + rev, ok := r.ResourceRevision(metaschema.KindGroup, "stream-only", "stream-only") + assert.True(t, ok) + assert.Equal(t, int64(6), rev) + + rev, ok = r.ResourceRevision(metaschema.KindGroup, "measure-only", "measure-only") + assert.True(t, ok) + assert.Equal(t, int64(8), rev) + + assert.True(t, r.IsAbsent(metaschema.KindGroup, "ghost", "ghost")) +} + +func TestNodeRepoRegistry_DuplicateRegistrationIsIdempotent(t *testing.T) { + repo := newFakeRevRepo(4) + r := NewNodeRepoRegistry() + r.Register(metaschema.KindStream, repo) + r.Register(metaschema.KindStream, repo) + r.Register(metaschema.KindStream|metaschema.KindIndexRule, repo) + + assert.Equal(t, int64(4), r.LatestModRevision()) + assert.True(t, r.HasKind(metaschema.KindStream)) + assert.True(t, r.HasKind(metaschema.KindIndexRule)) + assert.False(t, r.HasKind(metaschema.KindMeasure)) +} + +func TestNodeRepoRegistry_HasKind(t *testing.T) { + r := NewNodeRepoRegistry() + r.Register(metaschema.KindMeasure|metaschema.KindIndexRule|metaschema.KindIndexRuleBinding|metaschema.KindGroup, newFakeRevRepo(1)) + + assert.True(t, r.HasKind(metaschema.KindMeasure)) + assert.True(t, r.HasKind(metaschema.KindIndexRule)) + assert.True(t, r.HasKind(metaschema.KindIndexRuleBinding)) + assert.True(t, r.HasKind(metaschema.KindGroup)) + assert.False(t, r.HasKind(metaschema.KindTopNAggregation), "TopN routes via schemaCache, not the registry") + assert.False(t, r.HasKind(metaschema.KindProperty), "Property routes via schemaCache, not the registry") +} + +func TestNodeRepoRegistry_ConcurrentRegisterAndLookup(t *testing.T) { + r := NewNodeRepoRegistry() + repos := []*fakeRevRepo{newFakeRevRepo(1), newFakeRevRepo(2), newFakeRevRepo(3), newFakeRevRepo(4)} + for _, repo := range repos { + repo.put("g", "n", 10) + } + + var wg sync.WaitGroup + wg.Add(len(repos)) + for i, repo := range repos { + go func(idx int, rp *fakeRevRepo) { + defer wg.Done() + kinds := metaschema.KindMeasure + if idx%2 == 0 { + kinds = metaschema.KindStream + } + r.Register(kinds, rp) + }(i, repo) + } + + done := make(chan struct{}) + go func() { + for range 1000 { + _ = r.LatestModRevision() + _, _ = r.ResourceRevision(metaschema.KindStream, "g", "n") + _ = r.HasKind(metaschema.KindMeasure) + } + close(done) + }() + + wg.Wait() + <-done + + assert.Equal(t, int64(1), r.LatestModRevision()) + assert.True(t, r.HasKind(metaschema.KindStream)) + assert.True(t, r.HasKind(metaschema.KindMeasure)) +}
