This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 3c7989078f09dba818e185aa036c298918374734
Author: Hongtao Gao <[email protected]>
AuthorDate: Thu May 7 04:09:20 2026 +0000

    feat(property): land PauseDataNodeWatch / ResumeDataNodeWatch (Phase 2 Step 
1.0 follow-up)
    
    Replaces the ErrWatchControlNotImplemented stub with a working
    data-node watch-control hook so cluster-only schema-consistency specs
    that need a single data node to fall behind the cluster can be
    authored. The plan parked these helpers under "Step 2.8" but they are
    really a Step 1.0 deferred item — fixing them now unblocks §6.12a/b/c/d
    spec authoring without touching CP-5 / CP-6 acceptance.
    
    Implementation:
      - SchemaRegistry grows pauseMu / paused / pauseQueue fields;
        handleWatchEvent gates after the prop==nil guard so paused events
        accumulate in arrival order.
      - PauseNotifications flips paused=true; ResumeNotifications flips
        paused=false and drains the queue by replaying handleWatchEvent on
        each event in order.
      - A per-process roster (banyand/metadata/schema/property/
        watch_control.go) tracks every SchemaRegistry constructed via
        NewSchemaRegistryClient by index. The test harness reads the count
        before+after each CMD() invocation to bind the freshly-spawned data
        node's gRPC address to its registry handle.
      - pkg/test/setup/watch_control.go replaces the stub with an
        addr->registry map populated from startDataNode. Pause / Resume
        look up by address and call into SchemaRegistry.PauseNotifications /
        ResumeNotifications. The legacy ErrWatchControlNotImplemented
        sentinel is retained (wrapped) for callers that hit an unbound
        address — typically liaison nodes (no SchemaRegistry roster slot)
        or Pause calls before the matching DataNode health check has
        completed.
    
    6 unit tests in banyand/metadata/schema/property/watch_control_test.go
    pin the contract: events queued while paused, queue drained on resume,
    no-op on un-paused resume, post-resume events skip the queue,
    nil-property events short-circuit before the gate, and the roster's
    register/index/by-index lookups.
    
    Distributed schema integration suite still 28/28; full property
    package tests + lint clean.
    
    Note: §6.12a/b/c/d cluster-only specs were never authored — the
    helpers above unblock that work but do not provide it. Spec authoring
    is a separate follow-up.
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 banyand/metadata/schema/property/client.go         |  47 ++++++
 banyand/metadata/schema/property/watch_control.go  |  66 +++++++++
 .../metadata/schema/property/watch_control_test.go | 161 +++++++++++++++++++++
 pkg/test/setup/setup.go                            |  15 ++
 pkg/test/setup/watch_control.go                    |  91 +++++++++---
 5 files changed, 363 insertions(+), 17 deletions(-)

diff --git a/banyand/metadata/schema/property/client.go 
b/banyand/metadata/schema/property/client.go
index 50c6cf911..d5385a613 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -130,6 +130,7 @@ type SchemaRegistry struct {
        caCertReloader     *pkgtls.Reloader
        handlers           map[schema.Kind][]schema.EventHandler
        watchSessions      map[string]*watchSession
+       pauseQueue         []*schemav1.WatchSchemasResponse
        syncInterval       time.Duration
        syncTimeout        time.Duration
        watchMaxBackoff    time.Duration
@@ -138,6 +139,8 @@ type SchemaRegistry struct {
        syncRound          uint64
        mux                sync.RWMutex
        watchMu            sync.Mutex
+       pauseMu            sync.Mutex
+       paused             bool
 }
 
 // NewSchemaRegistryClient creates a new property-based schema registry client.
@@ -211,6 +214,7 @@ func NewSchemaRegistryClient(cfg *ClientConfig) 
(*SchemaRegistry, error) {
                fullReconcileEvery: fullReconcileEvery,
        }
        handler.registry = reg
+       registerForWatchControl(reg)
 
        if cfg.CurNode != nil && isPropertySchemaNode(cfg.CurNode) {
                connMgr.OnAddOrUpdate(cfg.CurNode)
@@ -1395,6 +1399,19 @@ func (r *SchemaRegistry) handleWatchEvent(resp 
*schemav1.WatchSchemasResponse) {
        if prop == nil {
                return
        }
+       // Pause gate (test-only): when PauseNotifications has been called on 
this
+       // registry, queue the event for replay on ResumeNotifications. Used by
+       // pkg/test/setup.PauseDataNodeWatch to exercise cluster-only specs
+       // (§6.12) that require a single data node falling behind the cluster.
+       // Production code never reaches the queue path because 
PauseNotifications
+       // is only invoked from the test harness.
+       r.pauseMu.Lock()
+       if r.paused {
+               r.pauseQueue = append(r.pauseQueue, resp)
+               r.pauseMu.Unlock()
+               return
+       }
+       r.pauseMu.Unlock()
        parsed := ParseTags(prop.GetTags())
        kindStr := parsed.Kind
        if kindStr == "" {
@@ -1748,3 +1765,33 @@ func isPropertySchemaNode(node *databasev1.Node) bool {
        }
        return false
 }
+
+// PauseNotifications halts watch-event processing on this registry — events
+// that arrive while paused accumulate in an internal queue. Used by
+// pkg/test/setup.PauseDataNodeWatch for cluster-only schema-consistency
+// specs that need a single data node to fall behind the cluster. Production
+// code never calls this; the only caller is the test harness.
+func (r *SchemaRegistry) PauseNotifications() {
+       r.pauseMu.Lock()
+       r.paused = true
+       r.pauseMu.Unlock()
+}
+
+// ResumeNotifications resumes watch-event processing and drains the queue
+// accumulated while paused. Events are replayed in arrival order so
+// downstream handlers (entityRepo / schemaRepo) see the same sequence they
+// would have observed if pause had never happened. A no-op when not paused.
+func (r *SchemaRegistry) ResumeNotifications() {
+       r.pauseMu.Lock()
+       if !r.paused {
+               r.pauseMu.Unlock()
+               return
+       }
+       r.paused = false
+       queue := r.pauseQueue
+       r.pauseQueue = nil
+       r.pauseMu.Unlock()
+       for _, evt := range queue {
+               r.handleWatchEvent(evt)
+       }
+}
diff --git a/banyand/metadata/schema/property/watch_control.go 
b/banyand/metadata/schema/property/watch_control.go
new file mode 100644
index 000000000..7fc627a3d
--- /dev/null
+++ b/banyand/metadata/schema/property/watch_control.go
@@ -0,0 +1,66 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import "sync"
+
+// schemaRegistryRoster is the per-process index of every SchemaRegistry
+// constructed via NewSchemaRegistryClient. It exists so the test harness
+// in pkg/test/setup can attach a watch-control handle to each data node
+// it spawns without changing the production SchemaRegistry constructor's
+// signature: each data node's metadata.clientService creates exactly one
+// SchemaRegistry during PreRun, and the harness captures the index range
+// around its CMD() invocation to map gRPC address -> registry.
+//
+// This is purely a test affordance — production code never reads from the
+// roster. It is exposed only because the data-node node lives behind a
+// goroutine boundary that test code cannot otherwise reach.
+var (
+       schemaRegistryRosterMu sync.RWMutex
+       schemaRegistryRoster   []*SchemaRegistry
+)
+
+// registerForWatchControl appends r to the per-process roster and returns
+// its index. Called from NewSchemaRegistryClient. Thread-safe.
+func registerForWatchControl(r *SchemaRegistry) {
+       schemaRegistryRosterMu.Lock()
+       schemaRegistryRoster = append(schemaRegistryRoster, r)
+       schemaRegistryRosterMu.Unlock()
+}
+
+// CountSchemaRegistries returns the number of SchemaRegistry instances
+// constructed in this process so far. The test harness calls this before
+// and after a CMD() invocation to discover which registries belong to the
+// node it just started.
+func CountSchemaRegistries() int {
+       schemaRegistryRosterMu.RLock()
+       defer schemaRegistryRosterMu.RUnlock()
+       return len(schemaRegistryRoster)
+}
+
+// SchemaRegistryByIndex returns the i-th SchemaRegistry registered in this
+// process, or nil if i is out of range. The test harness uses this to bind
+// a freshly-spawned node's address to its registry handle.
+func SchemaRegistryByIndex(i int) *SchemaRegistry {
+       schemaRegistryRosterMu.RLock()
+       defer schemaRegistryRosterMu.RUnlock()
+       if i < 0 || i >= len(schemaRegistryRoster) {
+               return nil
+       }
+       return schemaRegistryRoster[i]
+}
diff --git a/banyand/metadata/schema/property/watch_control_test.go 
b/banyand/metadata/schema/property/watch_control_test.go
new file mode 100644
index 000000000..0d38400d3
--- /dev/null
+++ b/banyand/metadata/schema/property/watch_control_test.go
@@ -0,0 +1,161 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// newTestSchemaRegistry returns a minimally-initialized SchemaRegistry
+// suitable for the gate tests below — only the logger is required so the
+// post-resume dispatch tail (ParseTags / KindFromString warnings) does
+// not panic on nil logger access. The connection manager / cache /
+// handler map stay zero-valued because these tests only exercise the
+// pause / queue / drain paths, which short-circuit before any of those
+// fields are touched.
+func newTestSchemaRegistry() *SchemaRegistry {
+       return &SchemaRegistry{l: logger.GetLogger("test-watch-control")}
+}
+
+// fakeWatchEvent returns a minimally non-nil WatchSchemasResponse so the
+// gate path runs to completion. The event payload is intentionally bare —
+// these tests exercise the pause / queue / drain mechanism, not the
+// downstream parse / dispatch path.
+func fakeWatchEvent(id string) *schemav1.WatchSchemasResponse {
+       return &schemav1.WatchSchemasResponse{
+               Property: &propertyv1.Property{Id: id},
+       }
+}
+
+// TestPauseNotifications_QueuesEventsWhilePaused verifies that
+// handleWatchEvent appends to pauseQueue (and does not invoke the parse /
+// dispatch tail) while paused=true.
+func TestPauseNotifications_QueuesEventsWhilePaused(t *testing.T) {
+       r := newTestSchemaRegistry()
+       r.PauseNotifications()
+
+       r.handleWatchEvent(fakeWatchEvent("a"))
+       r.handleWatchEvent(fakeWatchEvent("b"))
+       r.handleWatchEvent(fakeWatchEvent("c"))
+
+       r.pauseMu.Lock()
+       queueLen := len(r.pauseQueue)
+       pausedFlag := r.paused
+       r.pauseMu.Unlock()
+
+       assert.True(t, pausedFlag, "PauseNotifications must flip paused=true")
+       require.Equal(t, 3, queueLen, "all three events must be queued while 
paused")
+       assert.Equal(t, "a", r.pauseQueue[0].GetProperty().GetId())
+       assert.Equal(t, "c", r.pauseQueue[2].GetProperty().GetId())
+}
+
+// TestResumeNotifications_DrainsQueueAndClearsPauseState verifies that
+// ResumeNotifications flips paused=false, replays the queued events
+// (handleWatchEvent runs to completion for each), and leaves an empty
+// queue. Replay correctness for a malformed payload is out of scope —
+// handleWatchEvent's existing nil/parse guards already cover that.
+func TestResumeNotifications_DrainsQueueAndClearsPauseState(t *testing.T) {
+       r := newTestSchemaRegistry()
+       r.PauseNotifications()
+       r.handleWatchEvent(fakeWatchEvent("a"))
+       r.handleWatchEvent(fakeWatchEvent("b"))
+
+       r.ResumeNotifications()
+
+       r.pauseMu.Lock()
+       queueLen := len(r.pauseQueue)
+       pausedFlag := r.paused
+       r.pauseMu.Unlock()
+
+       assert.False(t, pausedFlag, "ResumeNotifications must flip 
paused=false")
+       assert.Equal(t, 0, queueLen, "ResumeNotifications must drain the queue")
+}
+
+// TestResumeNotifications_NotPaused_NoOp verifies that calling
+// ResumeNotifications on a registry that was never paused is safe and
+// leaves the (empty) queue alone.
+func TestResumeNotifications_NotPaused_NoOp(t *testing.T) {
+       r := newTestSchemaRegistry()
+       r.ResumeNotifications()
+
+       r.pauseMu.Lock()
+       queueLen := len(r.pauseQueue)
+       pausedFlag := r.paused
+       r.pauseMu.Unlock()
+
+       assert.False(t, pausedFlag)
+       assert.Equal(t, 0, queueLen)
+}
+
+// TestHandleWatchEvent_AfterResume_RunsImmediately verifies that events
+// arriving after ResumeNotifications skip the queue path and run through
+// the dispatch tail synchronously, matching pre-pause behavior.
+func TestHandleWatchEvent_AfterResume_RunsImmediately(t *testing.T) {
+       r := newTestSchemaRegistry()
+       r.PauseNotifications()
+       r.ResumeNotifications()
+
+       r.handleWatchEvent(fakeWatchEvent("after-resume"))
+
+       r.pauseMu.Lock()
+       queueLen := len(r.pauseQueue)
+       r.pauseMu.Unlock()
+
+       assert.Equal(t, 0, queueLen, "post-resume events must not be queued")
+}
+
+// TestPauseNotifications_NilPropertyShortCircuits verifies that the
+// existing prop==nil guard short-circuits before the gate, so a nil
+// payload does not pollute the queue.
+func TestPauseNotifications_NilPropertyShortCircuits(t *testing.T) {
+       r := newTestSchemaRegistry()
+       r.PauseNotifications()
+
+       r.handleWatchEvent(&schemav1.WatchSchemasResponse{Property: nil})
+
+       r.pauseMu.Lock()
+       queueLen := len(r.pauseQueue)
+       r.pauseMu.Unlock()
+
+       assert.Equal(t, 0, queueLen, "nil-property events must not enter the 
pause queue")
+}
+
+// TestSchemaRegistryRoster_RegisterAndIndex verifies that
+// CountSchemaRegistries / SchemaRegistryByIndex track NewSchemaRegistryClient
+// calls in arrival order. Used by pkg/test/setup to bind a freshly-spawned
+// data node's gRPC address to its registry handle.
+func TestSchemaRegistryRoster_RegisterAndIndex(t *testing.T) {
+       before := CountSchemaRegistries()
+       r := newTestSchemaRegistry()
+       registerForWatchControl(r)
+       r2 := &SchemaRegistry{}
+       registerForWatchControl(r2)
+
+       require.Equal(t, before+2, CountSchemaRegistries())
+       assert.Same(t, r, SchemaRegistryByIndex(before))
+       assert.Same(t, r2, SchemaRegistryByIndex(before+1))
+       assert.Nil(t, SchemaRegistryByIndex(-1))
+       assert.Nil(t, SchemaRegistryByIndex(CountSchemaRegistries()))
+}
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index b74831c65..bf9ab7456 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -595,6 +595,7 @@ func startDataNode(config *ClusterConfig, dataDir string, 
flags ...string) (stri
                config.AddSchemaServerAddr(schemaAddr)
        }
 
+       beforeCount := property.CountSchemaRegistries()
        rawCloseFn := CMD(flags...)
 
        gomega.Eventually(
@@ -606,10 +607,24 @@ func startDataNode(config *ClusterConfig, dataDir string, 
flags ...string) (stri
                config.NodeDiscovery.FileWriter.AddNode(nodeAddr, nodeAddr)
        }
 
+       // Bind the data node's SchemaRegistry to its gRPC address so the test
+       // harness can call PauseDataNodeWatch / ResumeDataNodeWatch later. The
+       // metadata.clientService's PreRun creates exactly one SchemaRegistry,
+       // and the health check above guarantees PreRun has completed before we
+       // reach here — so taking the most recently registered roster slot is
+       // safe.
+       afterCount := property.CountSchemaRegistries()
+       if afterCount > beforeCount {
+               if reg := property.SchemaRegistryByIndex(afterCount - 1); reg 
!= nil {
+                       bindNodeWatchControl(nodeAddr, reg)
+               }
+       }
+
        closeFn := func() {
                if config.NodeDiscovery.FileWriter != nil {
                        config.NodeDiscovery.FileWriter.RemoveNode(nodeAddr)
                }
+               unbindNodeWatchControl(nodeAddr)
                rawCloseFn()
        }
 
diff --git a/pkg/test/setup/watch_control.go b/pkg/test/setup/watch_control.go
index 69e2b60f8..2aae353a0 100644
--- a/pkg/test/setup/watch_control.go
+++ b/pkg/test/setup/watch_control.go
@@ -17,26 +17,83 @@
 
 package setup
 
-import "errors"
+import (
+       "errors"
+       "fmt"
+       "sync"
 
-// ErrWatchControlNotImplemented is returned by PauseDataNodeWatch and
-// ResumeDataNodeWatch until the data-node watch control hooks land in
-// Phase 2 (Step 2.8). Cluster-only specs in §6.12 of the schema-consistency
-// plan must skip themselves when this sentinel surfaces.
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema/property"
+)
+
+// ErrWatchControlNotImplemented is the legacy sentinel from the Step 1.0
+// stub era. The watch-control helpers below are now wired to a real
+// SchemaRegistry handle, but the sentinel is still returned (wrapped) when
+// the caller passes a node name that the harness has not registered —
+// typically because PauseDataNodeWatch was called before the matching
+// setup.DataNode call completed its health check, or for a liaison node
+// (no SchemaRegistry roster binding).
 var ErrWatchControlNotImplemented = errors.New("watch control not implemented: 
landed in Phase 2")
 
-// PauseDataNodeWatch suspends the named data node's schema watch loop so a
-// test can observe behavior while the node is missing schema events. It is a
-// distributed-only hook; the standalone harness has no separate watch path.
-// Phase 2 will provide a real implementation; until then it returns
-// ErrWatchControlNotImplemented and callers should skip.
-func PauseDataNodeWatch(_ string) error {
-       return ErrWatchControlNotImplemented
+var (
+       watchControlMu     sync.Mutex
+       watchControlByNode = map[string]*property.SchemaRegistry{}
+)
+
+// bindNodeWatchControl associates the data node's gRPC address with its
+// SchemaRegistry handle so Pause / Resume can route to the right node.
+// Called from startDataNode after the health check passes.
+func bindNodeWatchControl(nodeAddr string, reg *property.SchemaRegistry) {
+       if reg == nil {
+               return
+       }
+       watchControlMu.Lock()
+       watchControlByNode[nodeAddr] = reg
+       watchControlMu.Unlock()
+}
+
+// unbindNodeWatchControl removes the binding for the given node address.
+// Called from the data node's close fn so SchemaRegistry references do
+// not leak across a setup -> teardown -> setup cycle.
+func unbindNodeWatchControl(nodeAddr string) {
+       watchControlMu.Lock()
+       delete(watchControlByNode, nodeAddr)
+       watchControlMu.Unlock()
+}
+
+func lookupNodeWatchControl(nodeAddr string) (*property.SchemaRegistry, bool) {
+       watchControlMu.Lock()
+       defer watchControlMu.Unlock()
+       reg, ok := watchControlByNode[nodeAddr]
+       return reg, ok
+}
+
+// PauseDataNodeWatch suspends the named data node's schema-watch loop so
+// a test can observe behavior while the node is missing schema events.
+// nodeAddr is the gRPC address returned from setup.DataNodeFromDataDir
+// (e.g. "127.0.0.1:31921"); pkg/test/setup binds the address during node
+// startup. Returns ErrWatchControlNotImplemented (wrapped) when the
+// address is not registered — typically because the node is a liaison or
+// because PauseDataNodeWatch was called before the matching
+// setup.DataNode invocation completed its health check.
+func PauseDataNodeWatch(nodeAddr string) error {
+       reg, ok := lookupNodeWatchControl(nodeAddr)
+       if !ok {
+               return fmt.Errorf("PauseDataNodeWatch: %w (no binding for node 
%q)",
+                       ErrWatchControlNotImplemented, nodeAddr)
+       }
+       reg.PauseNotifications()
+       return nil
 }
 
-// ResumeDataNodeWatch resumes a previously paused data-node watch loop. Same
-// Phase 2 caveat as PauseDataNodeWatch: it currently returns
-// ErrWatchControlNotImplemented so cluster-only specs can self-skip.
-func ResumeDataNodeWatch(_ string) error {
-       return ErrWatchControlNotImplemented
+// ResumeDataNodeWatch resumes a previously paused data-node watch loop
+// and drains any events that arrived during the pause window. Same
+// lookup semantics as PauseDataNodeWatch.
+func ResumeDataNodeWatch(nodeAddr string) error {
+       reg, ok := lookupNodeWatchControl(nodeAddr)
+       if !ok {
+               return fmt.Errorf("ResumeDataNodeWatch: %w (no binding for node 
%q)",
+                       ErrWatchControlNotImplemented, nodeAddr)
+       }
+       reg.ResumeNotifications()
+       return nil
 }

Reply via email to