This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3c7989078f09dba818e185aa036c298918374734 Author: Hongtao Gao <[email protected]> AuthorDate: Thu May 7 04:09:20 2026 +0000 feat(property): land PauseDataNodeWatch / ResumeDataNodeWatch (Phase 2 Step 1.0 follow-up) Replaces the ErrWatchControlNotImplemented stub with a working data-node watch-control hook so cluster-only schema-consistency specs that need a single data node to fall behind the cluster can be authored. The plan parked these helpers under "Step 2.8" but they are really a Step 1.0 deferred item — fixing them now unblocks §6.12a/b/c/d spec authoring without touching CP-5 / CP-6 acceptance. Implementation: - SchemaRegistry grows pauseMu / paused / pauseQueue fields; handleWatchEvent gates after the prop==nil guard so paused events accumulate in arrival order. - PauseNotifications flips paused=true; ResumeNotifications flips paused=false and drains the queue by replaying handleWatchEvent on each event in order. - A per-process roster (banyand/metadata/schema/property/ watch_control.go) tracks every SchemaRegistry constructed via NewSchemaRegistryClient by index. The test harness reads the count before+after each CMD() invocation to bind the freshly-spawned data node's gRPC address to its registry handle. - pkg/test/setup/watch_control.go replaces the stub with an addr->registry map populated from startDataNode. Pause / Resume look up by address and call into SchemaRegistry.PauseNotifications / ResumeNotifications. The legacy ErrWatchControlNotImplemented sentinel is retained (wrapped) for callers that hit an unbound address — typically liaison nodes (no SchemaRegistry roster slot) or Pause calls before the matching DataNode health check has completed. 6 unit tests in banyand/metadata/schema/property/watch_control_test.go pin the contract: events queued while paused, queue drained on resume, no-op on un-paused resume, post-resume events skip the queue, nil-property events short-circuit before the gate, and the roster's register/index/by-index lookups. Distributed schema integration suite still 28/28; full property package tests + lint clean. Note: §6.12a/b/c/d cluster-only specs were never authored — the helpers above unblock that work but do not provide it. Spec authoring is a separate follow-up. via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- banyand/metadata/schema/property/client.go | 47 ++++++ banyand/metadata/schema/property/watch_control.go | 66 +++++++++ .../metadata/schema/property/watch_control_test.go | 161 +++++++++++++++++++++ pkg/test/setup/setup.go | 15 ++ pkg/test/setup/watch_control.go | 91 +++++++++--- 5 files changed, 363 insertions(+), 17 deletions(-) diff --git a/banyand/metadata/schema/property/client.go b/banyand/metadata/schema/property/client.go index 50c6cf911..d5385a613 100644 --- a/banyand/metadata/schema/property/client.go +++ b/banyand/metadata/schema/property/client.go @@ -130,6 +130,7 @@ type SchemaRegistry struct { caCertReloader *pkgtls.Reloader handlers map[schema.Kind][]schema.EventHandler watchSessions map[string]*watchSession + pauseQueue []*schemav1.WatchSchemasResponse syncInterval time.Duration syncTimeout time.Duration watchMaxBackoff time.Duration @@ -138,6 +139,8 @@ type SchemaRegistry struct { syncRound uint64 mux sync.RWMutex watchMu sync.Mutex + pauseMu sync.Mutex + paused bool } // NewSchemaRegistryClient creates a new property-based schema registry client. @@ -211,6 +214,7 @@ func NewSchemaRegistryClient(cfg *ClientConfig) (*SchemaRegistry, error) { fullReconcileEvery: fullReconcileEvery, } handler.registry = reg + registerForWatchControl(reg) if cfg.CurNode != nil && isPropertySchemaNode(cfg.CurNode) { connMgr.OnAddOrUpdate(cfg.CurNode) @@ -1395,6 +1399,19 @@ func (r *SchemaRegistry) handleWatchEvent(resp *schemav1.WatchSchemasResponse) { if prop == nil { return } + // Pause gate (test-only): when PauseNotifications has been called on this + // registry, queue the event for replay on ResumeNotifications. Used by + // pkg/test/setup.PauseDataNodeWatch to exercise cluster-only specs + // (§6.12) that require a single data node falling behind the cluster. + // Production code never reaches the queue path because PauseNotifications + // is only invoked from the test harness. + r.pauseMu.Lock() + if r.paused { + r.pauseQueue = append(r.pauseQueue, resp) + r.pauseMu.Unlock() + return + } + r.pauseMu.Unlock() parsed := ParseTags(prop.GetTags()) kindStr := parsed.Kind if kindStr == "" { @@ -1748,3 +1765,33 @@ func isPropertySchemaNode(node *databasev1.Node) bool { } return false } + +// PauseNotifications halts watch-event processing on this registry — events +// that arrive while paused accumulate in an internal queue. Used by +// pkg/test/setup.PauseDataNodeWatch for cluster-only schema-consistency +// specs that need a single data node to fall behind the cluster. Production +// code never calls this; the only caller is the test harness. +func (r *SchemaRegistry) PauseNotifications() { + r.pauseMu.Lock() + r.paused = true + r.pauseMu.Unlock() +} + +// ResumeNotifications resumes watch-event processing and drains the queue +// accumulated while paused. Events are replayed in arrival order so +// downstream handlers (entityRepo / schemaRepo) see the same sequence they +// would have observed if pause had never happened. A no-op when not paused. +func (r *SchemaRegistry) ResumeNotifications() { + r.pauseMu.Lock() + if !r.paused { + r.pauseMu.Unlock() + return + } + r.paused = false + queue := r.pauseQueue + r.pauseQueue = nil + r.pauseMu.Unlock() + for _, evt := range queue { + r.handleWatchEvent(evt) + } +} diff --git a/banyand/metadata/schema/property/watch_control.go b/banyand/metadata/schema/property/watch_control.go new file mode 100644 index 000000000..7fc627a3d --- /dev/null +++ b/banyand/metadata/schema/property/watch_control.go @@ -0,0 +1,66 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import "sync" + +// schemaRegistryRoster is the per-process index of every SchemaRegistry +// constructed via NewSchemaRegistryClient. It exists so the test harness +// in pkg/test/setup can attach a watch-control handle to each data node +// it spawns without changing the production SchemaRegistry constructor's +// signature: each data node's metadata.clientService creates exactly one +// SchemaRegistry during PreRun, and the harness captures the index range +// around its CMD() invocation to map gRPC address -> registry. +// +// This is purely a test affordance — production code never reads from the +// roster. It is exposed only because the data-node node lives behind a +// goroutine boundary that test code cannot otherwise reach. +var ( + schemaRegistryRosterMu sync.RWMutex + schemaRegistryRoster []*SchemaRegistry +) + +// registerForWatchControl appends r to the per-process roster and returns +// its index. Called from NewSchemaRegistryClient. Thread-safe. +func registerForWatchControl(r *SchemaRegistry) { + schemaRegistryRosterMu.Lock() + schemaRegistryRoster = append(schemaRegistryRoster, r) + schemaRegistryRosterMu.Unlock() +} + +// CountSchemaRegistries returns the number of SchemaRegistry instances +// constructed in this process so far. The test harness calls this before +// and after a CMD() invocation to discover which registries belong to the +// node it just started. +func CountSchemaRegistries() int { + schemaRegistryRosterMu.RLock() + defer schemaRegistryRosterMu.RUnlock() + return len(schemaRegistryRoster) +} + +// SchemaRegistryByIndex returns the i-th SchemaRegistry registered in this +// process, or nil if i is out of range. The test harness uses this to bind +// a freshly-spawned node's address to its registry handle. +func SchemaRegistryByIndex(i int) *SchemaRegistry { + schemaRegistryRosterMu.RLock() + defer schemaRegistryRosterMu.RUnlock() + if i < 0 || i >= len(schemaRegistryRoster) { + return nil + } + return schemaRegistryRoster[i] +} diff --git a/banyand/metadata/schema/property/watch_control_test.go b/banyand/metadata/schema/property/watch_control_test.go new file mode 100644 index 000000000..0d38400d3 --- /dev/null +++ b/banyand/metadata/schema/property/watch_control_test.go @@ -0,0 +1,161 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// newTestSchemaRegistry returns a minimally-initialized SchemaRegistry +// suitable for the gate tests below — only the logger is required so the +// post-resume dispatch tail (ParseTags / KindFromString warnings) does +// not panic on nil logger access. The connection manager / cache / +// handler map stay zero-valued because these tests only exercise the +// pause / queue / drain paths, which short-circuit before any of those +// fields are touched. +func newTestSchemaRegistry() *SchemaRegistry { + return &SchemaRegistry{l: logger.GetLogger("test-watch-control")} +} + +// fakeWatchEvent returns a minimally non-nil WatchSchemasResponse so the +// gate path runs to completion. The event payload is intentionally bare — +// these tests exercise the pause / queue / drain mechanism, not the +// downstream parse / dispatch path. +func fakeWatchEvent(id string) *schemav1.WatchSchemasResponse { + return &schemav1.WatchSchemasResponse{ + Property: &propertyv1.Property{Id: id}, + } +} + +// TestPauseNotifications_QueuesEventsWhilePaused verifies that +// handleWatchEvent appends to pauseQueue (and does not invoke the parse / +// dispatch tail) while paused=true. +func TestPauseNotifications_QueuesEventsWhilePaused(t *testing.T) { + r := newTestSchemaRegistry() + r.PauseNotifications() + + r.handleWatchEvent(fakeWatchEvent("a")) + r.handleWatchEvent(fakeWatchEvent("b")) + r.handleWatchEvent(fakeWatchEvent("c")) + + r.pauseMu.Lock() + queueLen := len(r.pauseQueue) + pausedFlag := r.paused + r.pauseMu.Unlock() + + assert.True(t, pausedFlag, "PauseNotifications must flip paused=true") + require.Equal(t, 3, queueLen, "all three events must be queued while paused") + assert.Equal(t, "a", r.pauseQueue[0].GetProperty().GetId()) + assert.Equal(t, "c", r.pauseQueue[2].GetProperty().GetId()) +} + +// TestResumeNotifications_DrainsQueueAndClearsPauseState verifies that +// ResumeNotifications flips paused=false, replays the queued events +// (handleWatchEvent runs to completion for each), and leaves an empty +// queue. Replay correctness for a malformed payload is out of scope — +// handleWatchEvent's existing nil/parse guards already cover that. +func TestResumeNotifications_DrainsQueueAndClearsPauseState(t *testing.T) { + r := newTestSchemaRegistry() + r.PauseNotifications() + r.handleWatchEvent(fakeWatchEvent("a")) + r.handleWatchEvent(fakeWatchEvent("b")) + + r.ResumeNotifications() + + r.pauseMu.Lock() + queueLen := len(r.pauseQueue) + pausedFlag := r.paused + r.pauseMu.Unlock() + + assert.False(t, pausedFlag, "ResumeNotifications must flip paused=false") + assert.Equal(t, 0, queueLen, "ResumeNotifications must drain the queue") +} + +// TestResumeNotifications_NotPaused_NoOp verifies that calling +// ResumeNotifications on a registry that was never paused is safe and +// leaves the (empty) queue alone. +func TestResumeNotifications_NotPaused_NoOp(t *testing.T) { + r := newTestSchemaRegistry() + r.ResumeNotifications() + + r.pauseMu.Lock() + queueLen := len(r.pauseQueue) + pausedFlag := r.paused + r.pauseMu.Unlock() + + assert.False(t, pausedFlag) + assert.Equal(t, 0, queueLen) +} + +// TestHandleWatchEvent_AfterResume_RunsImmediately verifies that events +// arriving after ResumeNotifications skip the queue path and run through +// the dispatch tail synchronously, matching pre-pause behavior. +func TestHandleWatchEvent_AfterResume_RunsImmediately(t *testing.T) { + r := newTestSchemaRegistry() + r.PauseNotifications() + r.ResumeNotifications() + + r.handleWatchEvent(fakeWatchEvent("after-resume")) + + r.pauseMu.Lock() + queueLen := len(r.pauseQueue) + r.pauseMu.Unlock() + + assert.Equal(t, 0, queueLen, "post-resume events must not be queued") +} + +// TestPauseNotifications_NilPropertyShortCircuits verifies that the +// existing prop==nil guard short-circuits before the gate, so a nil +// payload does not pollute the queue. +func TestPauseNotifications_NilPropertyShortCircuits(t *testing.T) { + r := newTestSchemaRegistry() + r.PauseNotifications() + + r.handleWatchEvent(&schemav1.WatchSchemasResponse{Property: nil}) + + r.pauseMu.Lock() + queueLen := len(r.pauseQueue) + r.pauseMu.Unlock() + + assert.Equal(t, 0, queueLen, "nil-property events must not enter the pause queue") +} + +// TestSchemaRegistryRoster_RegisterAndIndex verifies that +// CountSchemaRegistries / SchemaRegistryByIndex track NewSchemaRegistryClient +// calls in arrival order. Used by pkg/test/setup to bind a freshly-spawned +// data node's gRPC address to its registry handle. +func TestSchemaRegistryRoster_RegisterAndIndex(t *testing.T) { + before := CountSchemaRegistries() + r := newTestSchemaRegistry() + registerForWatchControl(r) + r2 := &SchemaRegistry{} + registerForWatchControl(r2) + + require.Equal(t, before+2, CountSchemaRegistries()) + assert.Same(t, r, SchemaRegistryByIndex(before)) + assert.Same(t, r2, SchemaRegistryByIndex(before+1)) + assert.Nil(t, SchemaRegistryByIndex(-1)) + assert.Nil(t, SchemaRegistryByIndex(CountSchemaRegistries())) +} diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index b74831c65..bf9ab7456 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -595,6 +595,7 @@ func startDataNode(config *ClusterConfig, dataDir string, flags ...string) (stri config.AddSchemaServerAddr(schemaAddr) } + beforeCount := property.CountSchemaRegistries() rawCloseFn := CMD(flags...) gomega.Eventually( @@ -606,10 +607,24 @@ func startDataNode(config *ClusterConfig, dataDir string, flags ...string) (stri config.NodeDiscovery.FileWriter.AddNode(nodeAddr, nodeAddr) } + // Bind the data node's SchemaRegistry to its gRPC address so the test + // harness can call PauseDataNodeWatch / ResumeDataNodeWatch later. The + // metadata.clientService's PreRun creates exactly one SchemaRegistry, + // and the health check above guarantees PreRun has completed before we + // reach here — so taking the most recently registered roster slot is + // safe. + afterCount := property.CountSchemaRegistries() + if afterCount > beforeCount { + if reg := property.SchemaRegistryByIndex(afterCount - 1); reg != nil { + bindNodeWatchControl(nodeAddr, reg) + } + } + closeFn := func() { if config.NodeDiscovery.FileWriter != nil { config.NodeDiscovery.FileWriter.RemoveNode(nodeAddr) } + unbindNodeWatchControl(nodeAddr) rawCloseFn() } diff --git a/pkg/test/setup/watch_control.go b/pkg/test/setup/watch_control.go index 69e2b60f8..2aae353a0 100644 --- a/pkg/test/setup/watch_control.go +++ b/pkg/test/setup/watch_control.go @@ -17,26 +17,83 @@ package setup -import "errors" +import ( + "errors" + "fmt" + "sync" -// ErrWatchControlNotImplemented is returned by PauseDataNodeWatch and -// ResumeDataNodeWatch until the data-node watch control hooks land in -// Phase 2 (Step 2.8). Cluster-only specs in §6.12 of the schema-consistency -// plan must skip themselves when this sentinel surfaces. + "github.com/apache/skywalking-banyandb/banyand/metadata/schema/property" +) + +// ErrWatchControlNotImplemented is the legacy sentinel from the Step 1.0 +// stub era. The watch-control helpers below are now wired to a real +// SchemaRegistry handle, but the sentinel is still returned (wrapped) when +// the caller passes a node name that the harness has not registered — +// typically because PauseDataNodeWatch was called before the matching +// setup.DataNode call completed its health check, or for a liaison node +// (no SchemaRegistry roster binding). var ErrWatchControlNotImplemented = errors.New("watch control not implemented: landed in Phase 2") -// PauseDataNodeWatch suspends the named data node's schema watch loop so a -// test can observe behavior while the node is missing schema events. It is a -// distributed-only hook; the standalone harness has no separate watch path. -// Phase 2 will provide a real implementation; until then it returns -// ErrWatchControlNotImplemented and callers should skip. -func PauseDataNodeWatch(_ string) error { - return ErrWatchControlNotImplemented +var ( + watchControlMu sync.Mutex + watchControlByNode = map[string]*property.SchemaRegistry{} +) + +// bindNodeWatchControl associates the data node's gRPC address with its +// SchemaRegistry handle so Pause / Resume can route to the right node. +// Called from startDataNode after the health check passes. +func bindNodeWatchControl(nodeAddr string, reg *property.SchemaRegistry) { + if reg == nil { + return + } + watchControlMu.Lock() + watchControlByNode[nodeAddr] = reg + watchControlMu.Unlock() +} + +// unbindNodeWatchControl removes the binding for the given node address. +// Called from the data node's close fn so SchemaRegistry references do +// not leak across a setup -> teardown -> setup cycle. +func unbindNodeWatchControl(nodeAddr string) { + watchControlMu.Lock() + delete(watchControlByNode, nodeAddr) + watchControlMu.Unlock() +} + +func lookupNodeWatchControl(nodeAddr string) (*property.SchemaRegistry, bool) { + watchControlMu.Lock() + defer watchControlMu.Unlock() + reg, ok := watchControlByNode[nodeAddr] + return reg, ok +} + +// PauseDataNodeWatch suspends the named data node's schema-watch loop so +// a test can observe behavior while the node is missing schema events. +// nodeAddr is the gRPC address returned from setup.DataNodeFromDataDir +// (e.g. "127.0.0.1:31921"); pkg/test/setup binds the address during node +// startup. Returns ErrWatchControlNotImplemented (wrapped) when the +// address is not registered — typically because the node is a liaison or +// because PauseDataNodeWatch was called before the matching +// setup.DataNode invocation completed its health check. +func PauseDataNodeWatch(nodeAddr string) error { + reg, ok := lookupNodeWatchControl(nodeAddr) + if !ok { + return fmt.Errorf("PauseDataNodeWatch: %w (no binding for node %q)", + ErrWatchControlNotImplemented, nodeAddr) + } + reg.PauseNotifications() + return nil } -// ResumeDataNodeWatch resumes a previously paused data-node watch loop. Same -// Phase 2 caveat as PauseDataNodeWatch: it currently returns -// ErrWatchControlNotImplemented so cluster-only specs can self-skip. -func ResumeDataNodeWatch(_ string) error { - return ErrWatchControlNotImplemented +// ResumeDataNodeWatch resumes a previously paused data-node watch loop +// and drains any events that arrived during the pause window. Same +// lookup semantics as PauseDataNodeWatch. +func ResumeDataNodeWatch(nodeAddr string) error { + reg, ok := lookupNodeWatchControl(nodeAddr) + if !ok { + return fmt.Errorf("ResumeDataNodeWatch: %w (no binding for node %q)", + ErrWatchControlNotImplemented, nodeAddr) + } + reg.ResumeNotifications() + return nil }
