This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/phase-2-cp5-march by this push:
     new 49d613cc1 refactor(property): move pause gate to 
processInitialResource + handleDeletion (Phase 2 Step 1.0 follow-up, hardening)
49d613cc1 is described below

commit 49d613cc154ed17acb32e35aba46dfbf316cb014
Author: Hongtao Gao <[email protected]>
AuthorDate: Thu May 7 14:59:49 2026 +0000

    refactor(property): move pause gate to processInitialResource + 
handleDeletion (Phase 2 Step 1.0 follow-up, hardening)
    
    The original pause gate sat at handleWatchEvent, which only covered the
    live watch path. The full-reconcile cycle (sendSyncRequest → digest
    merge → processInitialResourceFromProperty / handleDeletion at
    client.go:1587/1597) bypassed the gate so a paused node still advanced
    its schemaCache + notifiedModRevision via the periodic resync. Moving
    the gate to the cache-mutation entry points (processInitialResource +
    handleDeletion) covers all paths uniformly: every event funnels through
    one of these two functions before the cache is touched.
    
    Other plumbing landed alongside:
    
      - pkg/test/helpers.SharedContext gains DataNodeAddrs []string so
        cluster-only specs can pass entries to setup.PauseDataNodeWatch /
        ResumeDataNodeWatch without rediscovering them.
      - test/integration/distributed/schema/common.go captures the data
        nodes' gRPC addresses via setup.DataNodeWithAddrAndDir and pushes
        them through SharedContext.
      - pkg/test/setup/setup.go captures the SchemaRegistry roster index
        range around CMD() in startDataNode and binds the new SR to both
        the host-prefixed addr and the nodeHost-prefixed nodeAddr so
        callers can use either form.
      - banyand/metadata/schema/property.NewSchemaRegistryClient now
        registers in the watch-control roster only on success — pre-error
        registration leaked orphan entries that the test harness mistook
        for the working SR.
      - SchemaRegistry.pauseQueue switches from []*WatchSchemasResponse to
        []func() so the queued work can capture the parsed entry-point
        args (kind / prop / spec / propID / cacheEntry / revision) instead
        of re-parsing on replay.
    
    Unit tests in banyand/metadata/schema/property/watch_control_test.go
    update to reflect the new gate location: queue length grows on
    processInitialResourceFromProperty + handleDeletion under pause; resume
    drains in arrival order.
    
    Authoring §6.12a/b/c/d distributed integration specs is deferred —
    the in-process distributed harness does not currently expose
    NodeSchemaStatusService on data-node ports (data.go's pipeline never
    calls SetMetadataRepo, unlike liaison.go), and the cluster barrier on
    the liaison does not list data nodes in tier2 of its route table for
    fan-out. Fixing either is a separate architectural piece, not a CP-5
    deliverable; the §6.12 spec contracts are documented in
    .omc/plans/banyandb-schema-tbd-plan.md and can be authored once the
    data-node service exposure lands.
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 banyand/metadata/schema/property/client.go         |  60 +++++++----
 banyand/metadata/schema/property/watch_control.go  |   4 +-
 .../metadata/schema/property/watch_control_test.go | 116 +++++++++------------
 pkg/test/helpers/constant.go                       |  11 +-
 pkg/test/setup/setup.go                            |  23 ++--
 test/integration/distributed/schema/common.go      |  21 ++--
 6 files changed, 126 insertions(+), 109 deletions(-)

diff --git a/banyand/metadata/schema/property/client.go 
b/banyand/metadata/schema/property/client.go
index d5385a613..b7ec87069 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -130,7 +130,7 @@ type SchemaRegistry struct {
        caCertReloader     *pkgtls.Reloader
        handlers           map[schema.Kind][]schema.EventHandler
        watchSessions      map[string]*watchSession
-       pauseQueue         []*schemav1.WatchSchemasResponse
+       pauseQueue         []func()
        syncInterval       time.Duration
        syncTimeout        time.Duration
        watchMaxBackoff    time.Duration
@@ -214,7 +214,6 @@ func NewSchemaRegistryClient(cfg *ClientConfig) 
(*SchemaRegistry, error) {
                fullReconcileEvery: fullReconcileEvery,
        }
        handler.registry = reg
-       registerForWatchControl(reg)
 
        if cfg.CurNode != nil && isPropertySchemaNode(cfg.CurNode) {
                connMgr.OnAddOrUpdate(cfg.CurNode)
@@ -247,6 +246,12 @@ func NewSchemaRegistryClient(cfg *ClientConfig) 
(*SchemaRegistry, error) {
                        return nil, fmt.Errorf("no schema servers reachable 
after %s", initWaitTime)
                }
        }
+       // Register only on success — failed attempts (e.g. ListNode error,
+       // no reachable schema server) close their reg and return nil. Pre-error
+       // registration would leak orphan entries that the test harness would
+       // then mistake for the working SchemaRegistry, so the watch-control
+       // roster only sees fully-initialized registries.
+       registerForWatchControl(reg)
        return reg, nil
 }
 
@@ -1399,19 +1404,6 @@ func (r *SchemaRegistry) handleWatchEvent(resp 
*schemav1.WatchSchemasResponse) {
        if prop == nil {
                return
        }
-       // Pause gate (test-only): when PauseNotifications has been called on 
this
-       // registry, queue the event for replay on ResumeNotifications. Used by
-       // pkg/test/setup.PauseDataNodeWatch to exercise cluster-only specs
-       // (§6.12) that require a single data node falling behind the cluster.
-       // Production code never reaches the queue path because 
PauseNotifications
-       // is only invoked from the test harness.
-       r.pauseMu.Lock()
-       if r.paused {
-               r.pauseQueue = append(r.pauseQueue, resp)
-               r.pauseMu.Unlock()
-               return
-       }
-       r.pauseMu.Unlock()
        parsed := ParseTags(prop.GetTags())
        kindStr := parsed.Kind
        if kindStr == "" {
@@ -1615,6 +1607,21 @@ func (r *SchemaRegistry) mergeDigests(allDigests 
map[string][]*digestEntry) map[
 }
 
 func (r *SchemaRegistry) processInitialResourceFromProperty(kind schema.Kind, 
prop *propertyv1.Property, spec proto.Message) {
+       // Pause gate (test-only): when PauseNotifications has been called, 
defer
+       // the cache mutation + handler dispatch until ResumeNotifications 
drains
+       // the queue. Covers both the watch path and the full-reconcile path —
+       // every event funnels through this function before the schemaCache is
+       // updated and notifiedModRevision advances. Used by
+       // pkg/test/setup.PauseDataNodeWatch for §6.12 partial-cluster specs.
+       r.pauseMu.Lock()
+       if r.paused {
+               r.pauseQueue = append(r.pauseQueue, func() {
+                       r.processInitialResourceFromProperty(kind, prop, spec)
+               })
+               r.pauseMu.Unlock()
+               return
+       }
+       r.pauseMu.Unlock()
        propID := prop.GetId()
        parsed := ParseTags(prop.GetTags())
        entry := &cacheEntry{
@@ -1646,6 +1653,18 @@ func (r *SchemaRegistry) 
processInitialResourceFromProperty(kind schema.Kind, pr
 }
 
 func (r *SchemaRegistry) handleDeletion(kind schema.Kind, propID string, entry 
*cacheEntry, revision int64) {
+       // Pause gate: same rationale as processInitialResourceFromProperty.
+       // Skipping cache.Delete keeps the entry visible to GetAbsentKeys so
+       // AwaitSchemaDeleted reports the paused node as still_present_keys.
+       r.pauseMu.Lock()
+       if r.paused {
+               r.pauseQueue = append(r.pauseQueue, func() {
+                       r.handleDeletion(kind, propID, entry, revision)
+               })
+               r.pauseMu.Unlock()
+               return
+       }
+       r.pauseMu.Unlock()
        r.l.Debug().Stringer("kind", kind).Str("propID", 
propID).Int64("revision", revision).
                Msg("handleDeletion: attempting to delete from cache")
        if r.cache.Delete(propID, revision) {
@@ -1778,9 +1797,10 @@ func (r *SchemaRegistry) PauseNotifications() {
 }
 
 // ResumeNotifications resumes watch-event processing and drains the queue
-// accumulated while paused. Events are replayed in arrival order so
-// downstream handlers (entityRepo / schemaRepo) see the same sequence they
-// would have observed if pause had never happened. A no-op when not paused.
+// accumulated while paused. Each queued closure replays the original
+// processInitialResourceFromProperty / handleDeletion call so downstream
+// handlers (entityRepo / schemaRepo) and the schemaCache catch up to the
+// state the schema-server already reflects. A no-op when not paused.
 func (r *SchemaRegistry) ResumeNotifications() {
        r.pauseMu.Lock()
        if !r.paused {
@@ -1791,7 +1811,7 @@ func (r *SchemaRegistry) ResumeNotifications() {
        queue := r.pauseQueue
        r.pauseQueue = nil
        r.pauseMu.Unlock()
-       for _, evt := range queue {
-               r.handleWatchEvent(evt)
+       for _, replay := range queue {
+               replay()
        }
 }
diff --git a/banyand/metadata/schema/property/watch_control.go 
b/banyand/metadata/schema/property/watch_control.go
index 7fc627a3d..83b40036d 100644
--- a/banyand/metadata/schema/property/watch_control.go
+++ b/banyand/metadata/schema/property/watch_control.go
@@ -35,8 +35,8 @@ var (
        schemaRegistryRoster   []*SchemaRegistry
 )
 
-// registerForWatchControl appends r to the per-process roster and returns
-// its index. Called from NewSchemaRegistryClient. Thread-safe.
+// registerForWatchControl appends r to the per-process roster. Called from
+// NewSchemaRegistryClient. Thread-safe.
 func registerForWatchControl(r *SchemaRegistry) {
        schemaRegistryRosterMu.Lock()
        schemaRegistryRoster = append(schemaRegistryRoster, r)
diff --git a/banyand/metadata/schema/property/watch_control_test.go 
b/banyand/metadata/schema/property/watch_control_test.go
index 0d38400d3..f20e9bd2c 100644
--- a/banyand/metadata/schema/property/watch_control_test.go
+++ b/banyand/metadata/schema/property/watch_control_test.go
@@ -24,73 +24,53 @@ import (
        "github.com/stretchr/testify/require"
 
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
-       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 // newTestSchemaRegistry returns a minimally-initialized SchemaRegistry
-// suitable for the gate tests below — only the logger is required so the
-// post-resume dispatch tail (ParseTags / KindFromString warnings) does
-// not panic on nil logger access. The connection manager / cache /
-// handler map stay zero-valued because these tests only exercise the
-// pause / queue / drain paths, which short-circuit before any of those
-// fields are touched.
+// suitable for the gate tests below. Only the logger is required because
+// each test exercises an early-return path: the pause gate fires before
+// any code that would touch the schemaCache or handler map.
 func newTestSchemaRegistry() *SchemaRegistry {
        return &SchemaRegistry{l: logger.GetLogger("test-watch-control")}
 }
 
-// fakeWatchEvent returns a minimally non-nil WatchSchemasResponse so the
-// gate path runs to completion. The event payload is intentionally bare —
-// these tests exercise the pause / queue / drain mechanism, not the
-// downstream parse / dispatch path.
-func fakeWatchEvent(id string) *schemav1.WatchSchemasResponse {
-       return &schemav1.WatchSchemasResponse{
-               Property: &propertyv1.Property{Id: id},
-       }
-}
-
-// TestPauseNotifications_QueuesEventsWhilePaused verifies that
-// handleWatchEvent appends to pauseQueue (and does not invoke the parse /
-// dispatch tail) while paused=true.
-func TestPauseNotifications_QueuesEventsWhilePaused(t *testing.T) {
+// TestPauseNotifications_FlipsFlag verifies that PauseNotifications sets
+// the internal paused flag (the only state the test harness inspects).
+func TestPauseNotifications_FlipsFlag(t *testing.T) {
        r := newTestSchemaRegistry()
        r.PauseNotifications()
-
-       r.handleWatchEvent(fakeWatchEvent("a"))
-       r.handleWatchEvent(fakeWatchEvent("b"))
-       r.handleWatchEvent(fakeWatchEvent("c"))
-
        r.pauseMu.Lock()
-       queueLen := len(r.pauseQueue)
-       pausedFlag := r.paused
-       r.pauseMu.Unlock()
-
-       assert.True(t, pausedFlag, "PauseNotifications must flip paused=true")
-       require.Equal(t, 3, queueLen, "all three events must be queued while 
paused")
-       assert.Equal(t, "a", r.pauseQueue[0].GetProperty().GetId())
-       assert.Equal(t, "c", r.pauseQueue[2].GetProperty().GetId())
+       defer r.pauseMu.Unlock()
+       assert.True(t, r.paused, "PauseNotifications must flip paused=true")
 }
 
-// TestResumeNotifications_DrainsQueueAndClearsPauseState verifies that
-// ResumeNotifications flips paused=false, replays the queued events
-// (handleWatchEvent runs to completion for each), and leaves an empty
-// queue. Replay correctness for a malformed payload is out of scope —
-// handleWatchEvent's existing nil/parse guards already cover that.
-func TestResumeNotifications_DrainsQueueAndClearsPauseState(t *testing.T) {
+// TestResumeNotifications_ClearsFlagAndDrainsQueue verifies that
+// ResumeNotifications flips paused=false and drains every queued replay
+// closure in arrival order.
+func TestResumeNotifications_ClearsFlagAndDrainsQueue(t *testing.T) {
        r := newTestSchemaRegistry()
        r.PauseNotifications()
-       r.handleWatchEvent(fakeWatchEvent("a"))
-       r.handleWatchEvent(fakeWatchEvent("b"))
+
+       var replays []int
+       r.pauseMu.Lock()
+       for i := 0; i < 3; i++ {
+               idx := i
+               r.pauseQueue = append(r.pauseQueue, func() { replays = 
append(replays, idx) })
+       }
+       r.pauseMu.Unlock()
 
        r.ResumeNotifications()
 
        r.pauseMu.Lock()
-       queueLen := len(r.pauseQueue)
        pausedFlag := r.paused
+       queueLen := len(r.pauseQueue)
        r.pauseMu.Unlock()
 
        assert.False(t, pausedFlag, "ResumeNotifications must flip 
paused=false")
        assert.Equal(t, 0, queueLen, "ResumeNotifications must drain the queue")
+       assert.Equal(t, []int{0, 1, 2}, replays, "drained closures must run in 
arrival order")
 }
 
 // TestResumeNotifications_NotPaused_NoOp verifies that calling
@@ -101,45 +81,43 @@ func TestResumeNotifications_NotPaused_NoOp(t *testing.T) {
        r.ResumeNotifications()
 
        r.pauseMu.Lock()
-       queueLen := len(r.pauseQueue)
-       pausedFlag := r.paused
-       r.pauseMu.Unlock()
-
-       assert.False(t, pausedFlag)
-       assert.Equal(t, 0, queueLen)
+       defer r.pauseMu.Unlock()
+       assert.False(t, r.paused)
+       assert.Equal(t, 0, len(r.pauseQueue))
 }
 
-// TestHandleWatchEvent_AfterResume_RunsImmediately verifies that events
-// arriving after ResumeNotifications skip the queue path and run through
-// the dispatch tail synchronously, matching pre-pause behavior.
-func TestHandleWatchEvent_AfterResume_RunsImmediately(t *testing.T) {
+// TestProcessInitialResourceFromProperty_PausedQueuesAndSkipsCacheUpdate
+// verifies the load-bearing gate: when paused, 
processInitialResourceFromProperty
+// returns early so the schemaCache (nil here, would panic on access) is
+// untouched and the closure is queued for replay.
+func TestProcessInitialResourceFromProperty_PausedQueuesAndSkipsCacheUpdate(t 
*testing.T) {
        r := newTestSchemaRegistry()
        r.PauseNotifications()
-       r.ResumeNotifications()
 
-       r.handleWatchEvent(fakeWatchEvent("after-resume"))
+       prop := &propertyv1.Property{Id: "queued"}
+       // nil cache would panic if the gate did not short-circuit here.
+       r.processInitialResourceFromProperty(schema.KindMeasure, prop, nil)
 
        r.pauseMu.Lock()
-       queueLen := len(r.pauseQueue)
-       r.pauseMu.Unlock()
-
-       assert.Equal(t, 0, queueLen, "post-resume events must not be queued")
+       defer r.pauseMu.Unlock()
+       require.Equal(t, 1, len(r.pauseQueue),
+               "paused processInitialResourceFromProperty must queue a replay 
closure")
 }
 
-// TestPauseNotifications_NilPropertyShortCircuits verifies that the
-// existing prop==nil guard short-circuits before the gate, so a nil
-// payload does not pollute the queue.
-func TestPauseNotifications_NilPropertyShortCircuits(t *testing.T) {
+// TestHandleDeletion_PausedQueuesAndSkipsCacheDelete verifies the
+// counterpart gate on handleDeletion. cache.Delete is never invoked under
+// pause (the cache is nil here), so the entry remains visible to
+// GetAbsentKeys — exactly the behavior §6.12c relies on.
+func TestHandleDeletion_PausedQueuesAndSkipsCacheDelete(t *testing.T) {
        r := newTestSchemaRegistry()
        r.PauseNotifications()
 
-       r.handleWatchEvent(&schemav1.WatchSchemasResponse{Property: nil})
+       r.handleDeletion(schema.KindStream, "p_g/s", &cacheEntry{}, 1)
 
        r.pauseMu.Lock()
-       queueLen := len(r.pauseQueue)
-       r.pauseMu.Unlock()
-
-       assert.Equal(t, 0, queueLen, "nil-property events must not enter the 
pause queue")
+       defer r.pauseMu.Unlock()
+       require.Equal(t, 1, len(r.pauseQueue),
+               "paused handleDeletion must queue a replay closure")
 }
 
 // TestSchemaRegistryRoster_RegisterAndIndex verifies that
@@ -150,7 +128,7 @@ func TestSchemaRegistryRoster_RegisterAndIndex(t 
*testing.T) {
        before := CountSchemaRegistries()
        r := newTestSchemaRegistry()
        registerForWatchControl(r)
-       r2 := &SchemaRegistry{}
+       r2 := newTestSchemaRegistry()
        registerForWatchControl(r2)
 
        require.Equal(t, before+2, CountSchemaRegistries())
diff --git a/pkg/test/helpers/constant.go b/pkg/test/helpers/constant.go
index cced3618d..4e8964842 100644
--- a/pkg/test/helpers/constant.go
+++ b/pkg/test/helpers/constant.go
@@ -42,10 +42,19 @@ const (
 )
 
 // SharedContext is the context shared between test cases in the integration 
testing.
+//
+//nolint:govet // fieldalignment cannot reduce the pointer-byte run further; 
readability beats the 8-byte saving for a test-only struct.
 type SharedContext struct {
        BaseTime   time.Time
        Connection *grpclib.ClientConn
-       Mode       string
+       // DataNodeAddrs lists the gRPC addresses of the cluster's data nodes.
+       // Populated by the distributed BeforeSuite; nil for standalone runs.
+       // Cluster-only specs (§6.12 partial-cluster barrier scenarios) pass
+       // entries from this slice to setup.PauseDataNodeWatch /
+       // ResumeDataNodeWatch to drive a single node out of sync while the
+       // rest of the cluster stays caught up.
+       DataNodeAddrs []string
+       Mode          string
 }
 
 // Args is a wrapper seals all necessary info for table specs.
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index bf9ab7456..2e3fc7e2a 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -607,16 +607,20 @@ func startDataNode(config *ClusterConfig, dataDir string, 
flags ...string) (stri
                config.NodeDiscovery.FileWriter.AddNode(nodeAddr, nodeAddr)
        }
 
-       // Bind the data node's SchemaRegistry to its gRPC address so the test
-       // harness can call PauseDataNodeWatch / ResumeDataNodeWatch later. The
-       // metadata.clientService's PreRun creates exactly one SchemaRegistry,
-       // and the health check above guarantees PreRun has completed before we
-       // reach here — so taking the most recently registered roster slot is
-       // safe.
+       // Bind the data node's SchemaRegistry to both the host-prefixed addr
+       // returned to the caller and the nodeHost-prefixed nodeAddr the route
+       // table uses internally, so PauseDataNodeWatch / ResumeDataNodeWatch
+       // accept either form. The metadata.clientService's PreRun creates
+       // exactly one SchemaRegistry, and the health check above guarantees
+       // PreRun has completed — so the most recently registered roster slot
+       // is the right one for this node.
        afterCount := property.CountSchemaRegistries()
        if afterCount > beforeCount {
                if reg := property.SchemaRegistryByIndex(afterCount - 1); reg 
!= nil {
-                       bindNodeWatchControl(nodeAddr, reg)
+                       bindNodeWatchControl(addr, reg)
+                       if nodeAddr != addr {
+                               bindNodeWatchControl(nodeAddr, reg)
+                       }
                }
        }
 
@@ -624,7 +628,10 @@ func startDataNode(config *ClusterConfig, dataDir string, 
flags ...string) (stri
                if config.NodeDiscovery.FileWriter != nil {
                        config.NodeDiscovery.FileWriter.RemoveNode(nodeAddr)
                }
-               unbindNodeWatchControl(nodeAddr)
+               unbindNodeWatchControl(addr)
+               if nodeAddr != addr {
+                       unbindNodeWatchControl(nodeAddr)
+               }
                rawCloseFn()
        }
 
diff --git a/test/integration/distributed/schema/common.go 
b/test/integration/distributed/schema/common.go
index b1a2f7fe7..8869c7526 100644
--- a/test/integration/distributed/schema/common.go
+++ b/test/integration/distributed/schema/common.go
@@ -44,10 +44,11 @@ import (
 )
 
 var (
-       now        time.Time
-       stopFunc   func()
-       connection *grpc.ClientConn
-       goods      []gleak.Goroutine
+       now           time.Time
+       stopFunc      func()
+       connection    *grpc.ClientConn
+       goods         []gleak.Goroutine
+       dataNodeAddrs []string
 )
 
 var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
@@ -62,9 +63,10 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
        dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
        config := setup.PropertyClusterConfig(dfWriter)
        ginkgo.By("Starting data node 0")
-       closeDataNode0 := setup.DataNode(config)
+       addrDataNode0, _, closeDataNode0 := setup.DataNodeWithAddrAndDir(config)
        ginkgo.By("Starting data node 1")
-       closeDataNode1 := setup.DataNode(config)
+       addrDataNode1, _, closeDataNode1 := setup.DataNodeWithAddrAndDir(config)
+       dataNodeAddrs = []string{addrDataNode0, addrDataNode1}
        ginkgo.By("Loading schema via property")
        setup.PreloadSchemaViaProperty(config, test_stream.PreloadSchema, 
test_measure.PreloadSchema, test_trace.PreloadSchema)
        config.AddLoadedKinds(schemapkg.KindStream, schemapkg.KindMeasure, 
schemapkg.KindTrace)
@@ -87,9 +89,10 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
        connection, err = grpchelper.Conn(string(address), 10*time.Second,
                grpc.WithTransportCredentials(insecure.NewCredentials()))
        casesschema.SharedContext = helpers.SharedContext{
-               Connection: connection,
-               Mode:       "distributed",
-               BaseTime:   now,
+               Connection:    connection,
+               Mode:          helpers.ModeDistributed,
+               BaseTime:      now,
+               DataNodeAddrs: dataNodeAddrs,
        }
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
 })

Reply via email to