This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 593b00369f2c0d8eef2aa0503d4d3308153f7159 Author: Hongtao Gao <[email protected]> AuthorDate: Thu May 7 23:59:39 2026 +0000 feat(barrier): expose NodeSchemaStatusService on data-node ports (Phase 2 CP-6 follow-up) Decouples cluster.v1.NodeSchemaStatusService from the liaison-only fodc.v1.GroupLifecycleService inside banyand/queue/sub/server.go's Serve() block, which previously gated both registrations together under one `if s.metadataRepo != nil` check. The conflation prevented data-node startup from registering NodeSchemaStatusService alone: adding a SetMetadataRepo call on the data node would also drag in GroupLifecycleService, whose InspectAll path is liaison-shaped by design (it returns a cluster-wide group inventory to fodc agents). - Add queue.Server.SetNodeSchemaStatusRepo(metadata.Service) so each callsite owns one service: - liaison startup (pkg/cmdsetup/liaison.go) calls both SetMetadataRepo and SetNodeSchemaStatusRepo. - data-node startup (pkg/cmdsetup/data.go) calls only SetNodeSchemaStatusRepo. - banyand/queue/local.go gets a no-op SetNodeSchemaStatusRepo for interface conformance. - Split the Serve() registration block: GroupLifecycleService is gated on metadataRepo != nil; NodeSchemaStatusService is gated on nodeSchemaStatusRepo != nil. The metadata.Service is captured directly by the new setter, dropping the previous runtime type-assertion against metadataRepo. After this change the cluster barrier's tier2 fan-out probes data nodes for real MaxRevision instead of relying on the cross-version Unimplemented→ready policy in barrier_cluster.go, which is now correctly reserved for true Phase-1 peers. Repairs the regression that surfaced once data nodes started returning real MaxRevision values: - NodeSchemaStatusServer.GetMaxRevision previously returned min(schemaCache.notifiedModRevision, NodeRepoRegistry.LatestModRevision()). - NodeRepoRegistry.LatestModRevision aggregated per-service schemaRepo.latestModRevision via min, but each schemaRepo only advances on events for its own catalog (pkg/schema/init.go:72 filters by g.Catalog). The min was therefore pinned to the slowest catalog's last seen revision, so a steady measure-only event stream gated the barrier on an unrelated trace schemaRepo watermark (or vice-versa). Distributed integration failed 21 of 32 specs because of this aggregation alone. - GetMaxRevision now reads cache only — symmetric with the receiving liaison's selfName probe at barrier_cluster.go:354-360 which has always read cache-only. - NodeRepoRegistry.LatestModRevision and the corresponding methods on registry.RevisionRepository / pkg/schema. RevisionRepository / fakeRevRepo / fakeRevisionRepo are deleted so a future caller cannot reintroduce the bug. Per-key gating (GetKeyRevisions / GetAbsentKeys) keeps routing through the registry by kind via ResourceRevision / IsAbsent / HasKind, so §GC-1's executor-cache convergence on the receiving liaison's write/query gates is preserved. §6.12a / §6.12d remain g.PIt under the new mechanism for an orthogonal reason: the laggard-detection assertion passes but the post-resume AwaitRevisionApplied(newRev) does not converge inside the spec timeout. The per-key §6.12b/c flows do converge through the same queue-drain path, so the gap is scoped to the global notifiedModRevision watermark advancing through queue replay — not to the data-node fan-out, which §NSS-1 verified. Distributed schema integration suite reports `30 Passed | 0 Failed | 2 Pending | 0 Skipped` (95.4s). via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- .../liaison/grpc/schema_revision_registry_test.go | 23 ++---- banyand/metadata/schema/property/node_status.go | 35 +++----- .../schema/property/node_status_registry_test.go | 93 ++++++---------------- banyand/queue/local.go | 3 + banyand/queue/queue.go | 6 ++ banyand/queue/sub/server.go | 43 +++++----- pkg/cmdsetup/data.go | 1 + pkg/cmdsetup/liaison.go | 1 + pkg/schema/init.go | 6 +- pkg/schema/registry/registry.go | 41 +++------- pkg/schema/registry/registry_test.go | 45 ++++------- pkg/schema/schema.go | 5 +- test/cases/schema/barrier_cluster.go | 65 ++++++--------- 13 files changed, 128 insertions(+), 239 deletions(-) diff --git a/banyand/liaison/grpc/schema_revision_registry_test.go b/banyand/liaison/grpc/schema_revision_registry_test.go index 3a17de4e5..7885bd87b 100644 --- a/banyand/liaison/grpc/schema_revision_registry_test.go +++ b/banyand/liaison/grpc/schema_revision_registry_test.go @@ -19,7 +19,6 @@ package grpc import ( "sync" - "sync/atomic" "testing" "time" @@ -37,15 +36,12 @@ import ( ) // fakeRevisionRepo implements registry.RevisionRepository with a controllable -// per-key map and an atomically advancing latestModRevision watermark. Tests -// use it to simulate the eventCh-retry leak the gate must close: a state -// where the entityRepo locator already advanced to R but the schemaRepo (and -// therefore the registry) has not — in production this is the data-node -// executor lag the cluster barrier already accounts for via NodeRepoRegistry. +// per-key map. Tests use it to simulate the eventCh-retry leak the gate must +// close: a state where the entityRepo locator already advanced to R but the +// schemaRepo (and therefore the registry) has not. type fakeRevisionRepo struct { - keys map[string]int64 - latest atomic.Int64 - mu sync.RWMutex + keys map[string]int64 + mu sync.RWMutex } func newFakeRevisionRepo() *fakeRevisionRepo { @@ -60,17 +56,8 @@ func (f *fakeRevisionRepo) set(kind schema.Kind, group, name string, rev int64) f.mu.Lock() f.keys[f.keyOf(kind, group, name)] = rev f.mu.Unlock() - for { - cur := f.latest.Load() - if rev <= cur || f.latest.CompareAndSwap(cur, rev) { - return - } - } } -// LatestModRevision implements registry.RevisionRepository. -func (f *fakeRevisionRepo) LatestModRevision() int64 { return f.latest.Load() } - // ResourceRevision implements registry.RevisionRepository. func (f *fakeRevisionRepo) ResourceRevision(kind schema.Kind, group, name string) (int64, bool) { f.mu.RLock() diff --git a/banyand/metadata/schema/property/node_status.go b/banyand/metadata/schema/property/node_status.go index 0e24027e5..51cdfd946 100644 --- a/banyand/metadata/schema/property/node_status.go +++ b/banyand/metadata/schema/property/node_status.go @@ -145,35 +145,18 @@ func (s *NodeSchemaStatusServer) registry() *registry.NodeRepoRegistry { return s.registryProvider() } -// GetMaxRevision returns the minimum mod_revision across the schemaCache -// watermark and the per-service NodeRepoRegistry. Either source contributes -// only when present and non-empty; a registry with no registered repos is -// treated as "no executor cache to gate on" and skipped (otherwise its -// LatestModRevision()=0 would gate every verdict to 0 on a metadata-only -// host). -// -// When neither source has data the response carries 0, which the liaison's -// barrier loop interprets as "node not yet caught up" and continues polling. +// GetMaxRevision returns the schemaCache's notifiedModRevision watermark. +// The cache observes every catalog's events, so it is the correct global +// watermark — symmetric with the receiving liaison's selfName probe at +// barrier_cluster.go:354-360 which also reads cache-only. Per-key gating +// (executor-cache routing by kind) is handled by GetKeyRevisions / +// GetAbsentKeys via the NodeRepoRegistry. func (s *NodeSchemaStatusServer) GetMaxRevision(_ context.Context, _ *clusterv1.GetMaxRevisionRequest) (*clusterv1.GetMaxRevisionResponse, error) { - var ( - minRev int64 - hasSource bool - ) - if c := s.cache(); c != nil { - minRev = c.GetMaxModRevision() - hasSource = true - } - if reg := s.registry(); reg != nil && !reg.Empty() { - regRev := reg.LatestModRevision() - if !hasSource || regRev < minRev { - minRev = regRev - } - hasSource = true - } - if !hasSource { + c := s.cache() + if c == nil { return &clusterv1.GetMaxRevisionResponse{}, nil } - return &clusterv1.GetMaxRevisionResponse{MaxModRevision: minRev}, nil + return &clusterv1.GetMaxRevisionResponse{MaxModRevision: c.GetMaxModRevision()}, nil } // GetKeyRevisions returns per-key (mod_revision, present) pairs in the same diff --git a/banyand/metadata/schema/property/node_status_registry_test.go b/banyand/metadata/schema/property/node_status_registry_test.go index 58108212c..de1180677 100644 --- a/banyand/metadata/schema/property/node_status_registry_test.go +++ b/banyand/metadata/schema/property/node_status_registry_test.go @@ -31,18 +31,13 @@ import ( ) // fakeRevRepo emulates a per-service pkg/schema.schemaRepo for the registry -// pointer-identity tests. The "Latest" knob simulates the eventCh-retry leak -// scenario from pkg/schema/cache.go:106 — a repo lagging behind the property -// schemaCache because a transient processEvent failure deferred the apply -// onto the async retry queue. +// pointer-identity tests. type fakeRevRepo struct { resources map[metaschema.Kind]map[string]int64 - latest int64 } -func newFakeRevRepo(latest int64) *fakeRevRepo { +func newFakeRevRepo() *fakeRevRepo { return &fakeRevRepo{ - latest: latest, resources: make(map[metaschema.Kind]map[string]int64), } } @@ -59,8 +54,6 @@ func (f *fakeRevRepo) put(kind metaschema.Kind, group, name string, rev int64) { f.resources[kind][group+"/"+name] = rev } -func (f *fakeRevRepo) LatestModRevision() int64 { return f.latest } - func (f *fakeRevRepo) ResourceRevision(kind metaschema.Kind, group, name string) (int64, bool) { if m := f.resources[kind]; m != nil { if rev, ok := m[group+"/"+name]; ok { @@ -83,7 +76,7 @@ func (f *fakeRevRepo) IsAbsent(kind metaschema.Kind, group, name string) bool { // disagree on the same (group, ModRevision) pair. func TestExecutor_ResolvesGroupsViaSharedSchemaRepo(t *testing.T) { const targetRev int64 = 42 - repo := newFakeRevRepo(targetRev) + repo := newFakeRevRepo() repo.put(metaschema.KindGroup, "g1", "g1", targetRev) repo.put(metaschema.KindMeasure, "g1", "m1", targetRev) @@ -98,11 +91,6 @@ func TestExecutor_ResolvesGroupsViaSharedSchemaRepo(t *testing.T) { func() *registry.NodeRepoRegistry { return reg }, ) - maxResp, err := srv.GetMaxRevision(context.Background(), &clusterv1.GetMaxRevisionRequest{}) - require.NoError(t, err) - assert.Equal(t, targetRev, maxResp.GetMaxModRevision(), - "registry-only node reports the registered repo's latest") - revResp, err := srv.GetKeyRevisions(context.Background(), &clusterv1.GetKeyRevisionsRequest{ Keys: []*schemav1.SchemaKey{ {Kind: "group", Group: "g1", Name: "g1"}, @@ -132,23 +120,20 @@ func TestExecutor_ResolvesGroupsViaSharedSchemaRepo(t *testing.T) { "a repo-side mutation is immediately visible through the registry — same pointer") } -// TestNodeStatus_DoesNotAdvancePastSchemaRepoLag injects a ModRevision skew -// between the property schemaCache and the per-service schemaRepo (the -// scenario the eventCh-retry path in pkg/schema.schemaRepo.SendMetadataEvent -// produces) and asserts GetMaxRevision returns the laggard's value, never the -// cache's. This is the regression that distinguishes the Phase 2 §Step 2.5 -// pivot from the original schemaCache-only implementation: the barrier no -// longer certifies a revision the executor's resolver cache has not applied. -func TestNodeStatus_DoesNotAdvancePastSchemaRepoLag(t *testing.T) { - // schemaCache is at watermark 100 — every kind it tracks has been - // notified to handlers up to revision 100. +// TestNodeStatus_GetMaxRevision_ReadsCacheOnly asserts the new GetMaxRevision +// contract: the response equals the schemaCache's notifiedModRevision and is +// independent of the registry. The cache observes every catalog's events, so +// it is the correct global watermark — symmetric with the receiving liaison's +// selfName probe at barrier_cluster.go:354-360. Per-key executor-cache gating +// stays on the registry via GetKeyRevisions / GetAbsentKeys. +func TestNodeStatus_GetMaxRevision_ReadsCacheOnly(t *testing.T) { c := newSchemaCache() c.notifiedModRevision = 100 - // schemaRepo lags behind at 50 — the canonical executor-cache scenario - // the §Step 2.5 pivot exists to handle. - repo := newFakeRevRepo(50) - + // A registered repo with per-catalog state must NOT shift GetMaxRevision — + // the cache value alone wins. + repo := newFakeRevRepo() + repo.put(metaschema.KindMeasure, "g1", "m1", 50) reg := registry.NewNodeRepoRegistry() registry.MaybeRegister(reg, metaschema.KindMeasure, repo) @@ -159,26 +144,16 @@ func TestNodeStatus_DoesNotAdvancePastSchemaRepoLag(t *testing.T) { resp, err := srv.GetMaxRevision(context.Background(), &clusterv1.GetMaxRevisionRequest{}) require.NoError(t, err) - assert.Equal(t, int64(50), resp.GetMaxModRevision(), - "GetMaxRevision = min(schemaCache.watermark, registry.LatestModRevision); the laggard wins") - - // When the laggard catches up to 100, GetMaxRevision should match. - repo.latest = 100 - resp2, err := srv.GetMaxRevision(context.Background(), &clusterv1.GetMaxRevisionRequest{}) - require.NoError(t, err) - assert.Equal(t, int64(100), resp2.GetMaxModRevision(), - "once the laggard catches up, the min equals the watermark") + assert.Equal(t, int64(100), resp.GetMaxModRevision(), + "GetMaxRevision = schemaCache.notifiedModRevision; registry contributes nothing") } -// TestNodeStatus_RegistryOnly_NoSchemaCache covers the "registry alone" -// configuration: a node whose schemaCache provider returns nil but whose -// registry has registered repos. The server must report the registry's -// LatestModRevision as the max — mirrors the production wiring on a -// data-node before the schemaCache has been fully populated. -func TestNodeStatus_RegistryOnly_NoSchemaCache(t *testing.T) { - repo := newFakeRevRepo(77) +// TestNodeStatus_GetMaxRevision_NilCacheReturnsZero pins the boundary case: +// when the cache provider returns nil, the response carries 0 regardless of +// what the registry holds. +func TestNodeStatus_GetMaxRevision_NilCacheReturnsZero(t *testing.T) { + repo := newFakeRevRepo() repo.put(metaschema.KindStream, "g1", "s1", 60) - reg := registry.NewNodeRepoRegistry() registry.MaybeRegister(reg, metaschema.KindStream, repo) @@ -189,29 +164,7 @@ func TestNodeStatus_RegistryOnly_NoSchemaCache(t *testing.T) { resp, err := srv.GetMaxRevision(context.Background(), &clusterv1.GetMaxRevisionRequest{}) require.NoError(t, err) - assert.Equal(t, int64(77), resp.GetMaxModRevision()) -} - -// TestNodeStatus_EmptyRegistry_FallsBackToSchemaCache covers the production -// boot path: the registry exists but no service has registered yet. The -// server must skip the registry's "min" contribution rather than gate the -// barrier verdict to 0. Mirrors the schemaCache-only behavior for a -// metadata-only host. -func TestNodeStatus_EmptyRegistry_FallsBackToSchemaCache(t *testing.T) { - c := newSchemaCache() - c.notifiedModRevision = 25 - - reg := registry.NewNodeRepoRegistry() - - srv := NewNodeSchemaStatusServerWithRegistry( - func() *schemaCache { return c }, - func() *registry.NodeRepoRegistry { return reg }, - ) - - resp, err := srv.GetMaxRevision(context.Background(), &clusterv1.GetMaxRevisionRequest{}) - require.NoError(t, err) - assert.Equal(t, int64(25), resp.GetMaxModRevision(), - "empty registry contributes nothing — schemaCache watermark wins") + assert.Equal(t, int64(0), resp.GetMaxModRevision()) } // TestNodeStatus_KindRouting_TopNFallsBackToSchemaCache asserts that kinds @@ -225,7 +178,7 @@ func TestNodeStatus_KindRouting_TopNFallsBackToSchemaCache(t *testing.T) { c.entries[idTopN] = entryTopN c.notifiedModRevision = 33 - repo := newFakeRevRepo(33) + repo := newFakeRevRepo() repo.put(metaschema.KindMeasure, "g1", "m1", 33) reg := registry.NewNodeRepoRegistry() registry.MaybeRegister(reg, metaschema.KindMeasure, repo) diff --git a/banyand/queue/local.go b/banyand/queue/local.go index 362e61bbb..4e2f98d16 100644 --- a/banyand/queue/local.go +++ b/banyand/queue/local.go @@ -106,6 +106,9 @@ func (*local) SetRouteProviders(_ map[string]route.TableProvider) { func (*local) SetMetadataRepo(_ metadata.Repo) { } +func (*local) SetNodeSchemaStatusRepo(_ metadata.Service) { +} + func (*local) Register(bus.Topic, schema.EventHandler) { } diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go index 1b3e06936..4c48eb072 100644 --- a/banyand/queue/queue.go +++ b/banyand/queue/queue.go @@ -78,7 +78,13 @@ type Server interface { RegisterChunkedSyncHandler(topic bus.Topic, handler ChunkedSyncHandler) GetPort() *uint32 SetRouteProviders(providers map[string]route.TableProvider) + // SetMetadataRepo enables fodc.v1.GroupLifecycleService — liaison-only + // by design; data-node processes must not call it. SetMetadataRepo(repo metadata.Repo) + // SetNodeSchemaStatusRepo enables cluster.v1.NodeSchemaStatusService — + // per-node by design (reports the local schema cache); liaison and + // data-node processes both call it. + SetNodeSchemaStatusRepo(svc metadata.Service) } // BatchPublisher is the interface for publishing data in batch. diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go index befe295d8..33c39fc52 100644 --- a/banyand/queue/sub/server.go +++ b/banyand/queue/sub/server.go @@ -84,6 +84,7 @@ type server struct { omr observability.MetricsRegistry creds credentials.TransportCredentials metadataRepo metadata.Repo + nodeSchemaStatusRepo metadata.Service clientCloser context.CancelFunc ser *grpclib.Server listeners map[bus.Topic][]bus.MessageListener @@ -292,27 +293,23 @@ func (s *server) Serve() run.StopNotify { tracev1.RegisterTraceServiceServer(s.ser, &traceService{ser: s}) if s.metadataRepo != nil { fodcv1.RegisterGroupLifecycleServiceServer(s.ser, s) - // Phase 2 Step 2.1: data nodes expose NodeSchemaStatusService so the - // liaison's barrier fan-out (Step 2.2) can probe each node's local - // schema cache. The cache the server reads is the same SchemaRegistry - // instance the data-node query executor consults — see - // banyand/metadata/schema/property/node_status.go for the coherence - // argument. The registry is resolved per request (closure) so the - // service registers even when SchemaRegistry isn't ready at Serve - // time; the fail-closed nil-cache contract handles in-flight probes + } + if s.nodeSchemaStatusRepo != nil { + // The registry is resolved per request (closure) so the service + // registers even when SchemaRegistry isn't ready at Serve time; + // the fail-closed nil-cache contract handles in-flight probes // during initialization. - if svc, svcOk := s.metadataRepo.(metadata.Service); svcOk { - clusterv1.RegisterNodeSchemaStatusServiceServer(s.ser, property.NewNodeSchemaStatusServerForRegistryWithNodeRepo( - func() *property.SchemaRegistry { - reg, regOk := svc.SchemaRegistry().(*property.SchemaRegistry) - if !regOk { - return nil - } - return reg - }, - svc.NodeRepoRegistry, - )) - } + svc := s.nodeSchemaStatusRepo + clusterv1.RegisterNodeSchemaStatusServiceServer(s.ser, property.NewNodeSchemaStatusServerForRegistryWithNodeRepo( + func() *property.SchemaRegistry { + reg, regOk := svc.SchemaRegistry().(*property.SchemaRegistry) + if !regOk { + return nil + } + return reg + }, + svc.NodeRepoRegistry, + )) } var ctx context.Context @@ -425,6 +422,12 @@ func (s *server) SetMetadataRepo(repo metadata.Repo) { s.metadataRepo = repo } +// SetNodeSchemaStatusRepo wires the metadata.Service whose SchemaRegistry + +// NodeRepoRegistry back the per-node NodeSchemaStatusService. +func (s *server) SetNodeSchemaStatusRepo(svc metadata.Service) { + s.nodeSchemaStatusRepo = svc +} + type metrics struct { totalStarted meter.Counter totalFinished meter.Counter diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go index 6c2af425c..9f2185e9f 100644 --- a/pkg/cmdsetup/data.go +++ b/pkg/cmdsetup/data.go @@ -53,6 +53,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command { metaSvc.SetMetricsRegistry(metricSvc) pm := protector.NewMemory(metricSvc) pipeline := sub.NewServer(metricSvc) + pipeline.SetNodeSchemaStatusRepo(metaSvc) metaSvc.SetSnapshotPipeline(pipeline) propertyStreamPipeline := queue.Local() metaSvc.SetPropertyPipelineClient(propertyStreamPipeline) diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go index ec9248355..28c71b407 100644 --- a/pkg/cmdsetup/liaison.go +++ b/pkg/cmdsetup/liaison.go @@ -105,6 +105,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { TraceLiaisonNodeRegistry: grpc.NewClusterNodeRegistry(data.TopicTraceWrite, tire1Client, traceLiaisonNodeSel), }, metricSvc, pm, routeProviders) internalPipeline.SetMetadataRepo(metaSvc) + internalPipeline.SetNodeSchemaStatusRepo(metaSvc) profSvc := observability.NewProfService() httpServer := http.NewServer(grpcServer.GetAuthReloader()) var units []run.Unit diff --git a/pkg/schema/init.go b/pkg/schema/init.go index 3c20b32c9..cf66ad0c6 100644 --- a/pkg/schema/init.go +++ b/pkg/schema/init.go @@ -29,9 +29,9 @@ import ( ) // schemaRepo satisfies the registry.RevisionRepository contract via its -// LatestModRevision / ResourceRevision / IsAbsent methods. The compile-time -// assertion lives here (next to RevisionRepository) so a future API drift -// breaks the build before the cluster barrier silently degrades. +// ResourceRevision / IsAbsent methods. The compile-time assertion lives here +// so a future API drift breaks the build before the cluster barrier's +// per-key gating silently degrades. var _ registry.RevisionRepository = (*schemaRepo)(nil) var initTimeout = 10 * time.Second diff --git a/pkg/schema/registry/registry.go b/pkg/schema/registry/registry.go index d69883829..d90e57230 100644 --- a/pkg/schema/registry/registry.go +++ b/pkg/schema/registry/registry.go @@ -34,7 +34,6 @@ import ( // keeping the interface here (alongside the registry) instead of importing // pkg/schema avoids a dependency cycle through banyand/metadata. type RevisionRepository interface { - LatestModRevision() int64 ResourceRevision(kind schema.Kind, group, name string) (int64, bool) IsAbsent(kind schema.Kind, group, name string) bool } @@ -42,14 +41,21 @@ type RevisionRepository interface { // NodeRepoRegistry aggregates per-service RevisionRepository instances on a // single node (liaison or data). Each banyand service (measure, stream, trace, // …) registers its schemaRepo during PreRun with a kind bitmask describing the -// kinds the repo tracks. The registry routes barrier and node-status lookups to -// the same caches the executor consults during query plan execution, so a -// positive answer from LatestModRevision implies every registered repo has +// kinds the repo tracks. The registry routes per-key barrier and node-status +// lookups (GetKeyRevisions / GetAbsentKeys) to the same caches the executor +// consults during query plan execution, so a positive answer from +// ResourceRevision / IsAbsent implies the registered repo for that kind has // applied the revision and any executor cache the node exposes is at least at // that point. This is the load-bearing prerequisite for the Phase 2 // single-cache invariant — without it the cluster barrier would certify a // cache the data-node executor never reads. // +// The aggregate global watermark belongs to the schemaCache +// (notifiedModRevision), not to the registry: per-service schemaRepos filter +// events by catalog (pkg/schema/init.go), so their per-repo latestModRevision +// is incomparable across repos and a min/max aggregate cannot represent a +// meaningful node-wide watermark. +// // Safe for concurrent registration during PreRun and concurrent lookup. type NodeRepoRegistry struct { byKind map[schema.Kind][]RevisionRepository @@ -92,25 +98,6 @@ func (r *NodeRepoRegistry) Register(kinds schema.Kind, repo RevisionRepository) } } -// LatestModRevision returns the minimum LatestModRevision across every -// registered repo. A registry with no repos returns 0, matching the -// "node not yet caught up" semantic the cluster-barrier loop interprets as -// "keep polling". -func (r *NodeRepoRegistry) LatestModRevision() int64 { - r.mu.RLock() - defer r.mu.RUnlock() - if len(r.repos) == 0 { - return 0 - } - minRev := r.repos[0].LatestModRevision() - for _, repo := range r.repos[1:] { - if rev := repo.LatestModRevision(); rev < minRev { - minRev = rev - } - } - return minRev -} - // ResourceRevision routes the lookup to repos registered against the given // kind and returns the first match. Returns (0, false) when no repo is // registered for the kind or no registered repo holds the resource — the same @@ -149,11 +136,9 @@ func (r *NodeRepoRegistry) HasKind(kind schema.Kind) bool { return len(r.byKind[kind]) > 0 } -// Empty reports whether any repo has been registered. The node-status server -// uses this to skip the registry's min-rev contribution on a node where no -// per-service schemaRepo runs (e.g. a property-only metadata host) — without -// the check, an empty registry's LatestModRevision()=0 would gate every -// barrier verdict to 0. +// Empty reports whether any repo has been registered. GetAbsentKeys uses +// this to route through the schemaCache on a node where no per-service +// schemaRepo runs (e.g. a property-only metadata host). func (r *NodeRepoRegistry) Empty() bool { r.mu.RLock() defer r.mu.RUnlock() diff --git a/pkg/schema/registry/registry_test.go b/pkg/schema/registry/registry_test.go index 53a6d50f8..b5356757e 100644 --- a/pkg/schema/registry/registry_test.go +++ b/pkg/schema/registry/registry_test.go @@ -29,12 +29,10 @@ import ( // fakeRevRepo is a minimal RevisionRepository for registry unit tests. type fakeRevRepo struct { resources map[string]map[string]int64 - latest int64 } -func newFakeRevRepo(latest int64) *fakeRevRepo { +func newFakeRevRepo() *fakeRevRepo { return &fakeRevRepo{ - latest: latest, resources: make(map[string]map[string]int64), } } @@ -46,10 +44,6 @@ func (f *fakeRevRepo) put(group, name string, rev int64) { f.resources[group][name] = rev } -func (f *fakeRevRepo) LatestModRevision() int64 { - return f.latest -} - func (f *fakeRevRepo) ResourceRevision(_ metaschema.Kind, group, name string) (int64, bool) { if g := f.resources[group]; g != nil { if rev, ok := g[name]; ok { @@ -64,36 +58,28 @@ func (f *fakeRevRepo) IsAbsent(kind metaschema.Kind, group, name string) bool { return !ok } -func TestNodeRepoRegistry_EmptyReturnsZero(t *testing.T) { +func TestNodeRepoRegistry_EmptyHasNoKindsAndAbsentKeys(t *testing.T) { r := NewNodeRepoRegistry() - assert.Equal(t, int64(0), r.LatestModRevision()) rev, ok := r.ResourceRevision(metaschema.KindStream, "g", "n") assert.Equal(t, int64(0), rev) assert.False(t, ok) assert.True(t, r.IsAbsent(metaschema.KindStream, "g", "n")) assert.False(t, r.HasKind(metaschema.KindStream)) + assert.True(t, r.Empty()) } func TestNodeRepoRegistry_NilRepoOrZeroKindIsNoop(t *testing.T) { r := NewNodeRepoRegistry() r.Register(metaschema.KindStream, nil) - r.Register(0, newFakeRevRepo(5)) - assert.Equal(t, int64(0), r.LatestModRevision()) -} - -func TestNodeRepoRegistry_LatestModRevisionTakesMin(t *testing.T) { - r := NewNodeRepoRegistry() - r.Register(metaschema.KindMeasure|metaschema.KindIndexRule, newFakeRevRepo(10)) - r.Register(metaschema.KindStream, newFakeRevRepo(7)) - r.Register(metaschema.KindTrace, newFakeRevRepo(15)) - - assert.Equal(t, int64(7), r.LatestModRevision(), "min over registered repos wins") + r.Register(0, newFakeRevRepo()) + assert.True(t, r.Empty()) + assert.False(t, r.HasKind(metaschema.KindStream)) } func TestNodeRepoRegistry_ResourceRevisionRoutesByKind(t *testing.T) { - measureRepo := newFakeRevRepo(10) + measureRepo := newFakeRevRepo() measureRepo.put("g1", "m1", 9) - streamRepo := newFakeRevRepo(7) + streamRepo := newFakeRevRepo() streamRepo.put("g1", "s1", 5) r := NewNodeRepoRegistry() @@ -116,9 +102,9 @@ func TestNodeRepoRegistry_ResourceRevisionRoutesByKind(t *testing.T) { } func TestNodeRepoRegistry_GroupKindFanOut(t *testing.T) { - measureRepo := newFakeRevRepo(10) + measureRepo := newFakeRevRepo() measureRepo.put("measure-only", "measure-only", 8) - streamRepo := newFakeRevRepo(7) + streamRepo := newFakeRevRepo() streamRepo.put("stream-only", "stream-only", 6) r := NewNodeRepoRegistry() @@ -137,13 +123,13 @@ func TestNodeRepoRegistry_GroupKindFanOut(t *testing.T) { } func TestNodeRepoRegistry_DuplicateRegistrationIsIdempotent(t *testing.T) { - repo := newFakeRevRepo(4) + repo := newFakeRevRepo() r := NewNodeRepoRegistry() r.Register(metaschema.KindStream, repo) r.Register(metaschema.KindStream, repo) r.Register(metaschema.KindStream|metaschema.KindIndexRule, repo) - assert.Equal(t, int64(4), r.LatestModRevision()) + assert.False(t, r.Empty()) assert.True(t, r.HasKind(metaschema.KindStream)) assert.True(t, r.HasKind(metaschema.KindIndexRule)) assert.False(t, r.HasKind(metaschema.KindMeasure)) @@ -151,7 +137,7 @@ func TestNodeRepoRegistry_DuplicateRegistrationIsIdempotent(t *testing.T) { func TestNodeRepoRegistry_HasKind(t *testing.T) { r := NewNodeRepoRegistry() - r.Register(metaschema.KindMeasure|metaschema.KindIndexRule|metaschema.KindIndexRuleBinding|metaschema.KindGroup, newFakeRevRepo(1)) + r.Register(metaschema.KindMeasure|metaschema.KindIndexRule|metaschema.KindIndexRuleBinding|metaschema.KindGroup, newFakeRevRepo()) assert.True(t, r.HasKind(metaschema.KindMeasure)) assert.True(t, r.HasKind(metaschema.KindIndexRule)) @@ -163,7 +149,7 @@ func TestNodeRepoRegistry_HasKind(t *testing.T) { func TestNodeRepoRegistry_ConcurrentRegisterAndLookup(t *testing.T) { r := NewNodeRepoRegistry() - repos := []*fakeRevRepo{newFakeRevRepo(1), newFakeRevRepo(2), newFakeRevRepo(3), newFakeRevRepo(4)} + repos := []*fakeRevRepo{newFakeRevRepo(), newFakeRevRepo(), newFakeRevRepo(), newFakeRevRepo()} for _, repo := range repos { repo.put("g", "n", 10) } @@ -184,7 +170,6 @@ func TestNodeRepoRegistry_ConcurrentRegisterAndLookup(t *testing.T) { done := make(chan struct{}) go func() { for range 1000 { - _ = r.LatestModRevision() _, _ = r.ResourceRevision(metaschema.KindStream, "g", "n") _ = r.HasKind(metaschema.KindMeasure) } @@ -194,7 +179,7 @@ func TestNodeRepoRegistry_ConcurrentRegisterAndLookup(t *testing.T) { wg.Wait() <-done - assert.Equal(t, int64(1), r.LatestModRevision()) + assert.False(t, r.Empty()) assert.True(t, r.HasKind(metaschema.KindStream)) assert.True(t, r.HasKind(metaschema.KindMeasure)) } diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go index 13625a7c0..da301a039 100644 --- a/pkg/schema/schema.go +++ b/pkg/schema/schema.go @@ -114,11 +114,10 @@ type Repository interface { DropGroup(groupName string) error } -// RevisionRepository extends Repository with revision-query methods used by -// the write/query gates and the barrier implementation. +// RevisionRepository extends Repository with per-key revision-query methods +// used by the write/query gates and the barrier implementation. type RevisionRepository interface { Repository - LatestModRevision() int64 ResourceRevision(kind schema.Kind, group, name string) (int64, bool) IsAbsent(kind schema.Kind, group, name string) bool } diff --git a/test/cases/schema/barrier_cluster.go b/test/cases/schema/barrier_cluster.go index e76669133..b7eb0edd2 100644 --- a/test/cases/schema/barrier_cluster.go +++ b/test/cases/schema/barrier_cluster.go @@ -37,20 +37,14 @@ import ( // end-to-end through the public AwaitX RPCs. They pause the receiving // liaison's own SchemaRegistry; the cluster barrier's selfName probe reads // through that SR, so pausing it surfaces a laggard via the public AwaitX -// API without needing NodeSchemaStatusService exposed on data-node ports -// (which the in-process distributed harness does not currently provide; -// the cross-version Unimplemented→ready policy in the cluster fan-out -// would mask paused data nodes from the barrier's perspective). +// API. Data-node fan-out via NodeSchemaStatusService is covered by the +// unit tests in banyand/liaison/grpc/barrier_cluster_test.go (§FA-1..FD-2). +// These integration specs cover the orthogonal contract: the pause +// primitive's effect is observable through the public AwaitX RPC and +// resume drains the queued events so the barrier converges. // -// The role-prefix attribution (`liaison-...` vs `data-...`) the plan -// describes for §6.12 is already pinned by the unit tests in -// banyand/liaison/grpc/barrier_cluster_test.go (§FA-1..FD-2). These -// integration specs cover the orthogonal contract: the pause primitive's -// effect is observable through the public AwaitX RPC and the resume -// drains the queued events so the barrier converges. -// -// Specs skip themselves under standalone mode and when the liaison -// address is empty (the standalone harness has none). +// Specs skip themselves under standalone mode and when the receiving +// liaison address is empty (the standalone harness has none). func barrierClusterMeasureGroup(name string) *commonv1.Group { return &commonv1.Group{ @@ -107,21 +101,15 @@ var _ = g.Describe("Cluster barrier under partial-cluster conditions (§6.12)", _ = setup.ResumeDataNodeWatch(paused) }) - // §6.12a — AwaitRevisionApplied surfaces a paused liaison as a laggard - // via its selfName probe; resume drains the queue and the barrier - // converges. Uses Measure.Update for the post-pause bump because the - // Group watch path in this in-process harness occasionally completes - // before the gate sees it (the property-store reconcile cycle on - // Group writes can short-circuit the watch fan-out); §6.12b/c - // Measure flows are reliable. - // PENDING: queue drains successfully on resume (verified via the - // `queued: N` log line) but the schemaCache.notifiedModRevision - // watermark does not always reach the newRev target within 10s when - // the test re-issues AwaitRevisionApplied. The Measure-based - // per-key barrier (§6.12b/c) does converge, so this pending status - // scopes the gap to the global MaxRevision check; investigation is - // deferred to the same follow-up that authors data-node - // NodeSchemaStatusService exposure. + // §6.12a — AwaitRevisionApplied surfaces a paused liaison as a + // laggard via its selfName probe; resume drains the queue and the + // barrier converges. PENDING: the laggard-detection assertion passes + // but the post-resume AwaitRevisionApplied(newRev) does not converge + // inside the spec timeout. The per-key §6.12b/c flows (AwaitApplied / + // AwaitDeleted) do converge, so this gap is scoped to the global + // notifiedModRevision watermark advancing through queue replay under + // the in-process distributed harness — independent of the + // data-node NodeSchemaStatusService exposure. g.PIt("§6.12a AwaitRevisionApplied reports the paused liaison as a laggard", func() { groupName := fmt.Sprintf("bc-rev-%d", time.Now().UnixNano()) measureName := "bc_rev_measure" @@ -152,10 +140,8 @@ var _ = g.Describe("Cluster barrier under partial-cluster conditions (§6.12)", g.By("Calling AwaitRevisionApplied — paused liaison must surface as a laggard") // Brief settle so the bumped revision's watch event has time to - // reach the liaison's SR (which queues it under pause). Without - // this, the test races the watch stream and the queue can be - // empty at resume — the propagation delay between Update RPC - // commit and watch broadcast varies under load. + // reach the paused liaison's SR (which queues it under pause). + // Without this, the barrier can race the watch broadcast. time.Sleep(200 * time.Millisecond) callCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -280,16 +266,13 @@ var _ = g.Describe("Cluster barrier under partial-cluster conditions (§6.12)", }) // §6.12d — Cross-barrier recovery: after a multi-step pause-and-mutate - // sequence, resume drains the queued events in arrival order, so a + // sequence, resume drains the queued events in arrival order so a // follow-up AwaitRevisionApplied at the post-mutate revision returns - // applied=true. This pins the queue-drain contract end-to-end. - // PENDING: same harness limitation as §6.12a — the post-resume - // AwaitRevisionApplied does not always reach the queued finalRev - // inside the spec timeout, even though the queue drain log shows - // the events were replayed. Will pass once the data-node - // NodeSchemaStatusService exposure work lands and the cluster - // barrier observes a fan-out across all members instead of the - // liaison's selfName probe alone. + // applied=true with no laggards. PENDING for the same reason as + // §6.12a: the global AwaitRevisionApplied watermark does not converge + // through queue replay inside the spec timeout. §6.12b/c remain the + // authoritative end-to-end coverage of the queue-drain contract via + // per-key barriers. g.PIt("§6.12d cross-barrier recovery: resume drains queued events and clears the laggard", func() { groupName := fmt.Sprintf("bc-recovery-%d", time.Now().UnixNano()) measureName := "bc_recovery_measure"
