This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/phase-2-cp5-march by this push:
new 49d613cc1 refactor(property): move pause gate to
processInitialResource + handleDeletion (Phase 2 Step 1.0 follow-up, hardening)
49d613cc1 is described below
commit 49d613cc154ed17acb32e35aba46dfbf316cb014
Author: Hongtao Gao <[email protected]>
AuthorDate: Thu May 7 14:59:49 2026 +0000
refactor(property): move pause gate to processInitialResource +
handleDeletion (Phase 2 Step 1.0 follow-up, hardening)
The original pause gate sat at handleWatchEvent, which only covered the
live watch path. The full-reconcile cycle (sendSyncRequest → digest
merge → processInitialResourceFromProperty / handleDeletion at
client.go:1587/1597) bypassed the gate so a paused node still advanced
its schemaCache + notifiedModRevision via the periodic resync. Moving
the gate to the cache-mutation entry points (processInitialResource +
handleDeletion) covers all paths uniformly: every event funnels through
one of these two functions before the cache is touched.
Other plumbing landed alongside:
- pkg/test/helpers.SharedContext gains DataNodeAddrs []string so
cluster-only specs can pass entries to setup.PauseDataNodeWatch /
ResumeDataNodeWatch without rediscovering them.
- test/integration/distributed/schema/common.go captures the data
nodes' gRPC addresses via setup.DataNodeWithAddrAndDir and pushes
them through SharedContext.
- pkg/test/setup/setup.go captures the SchemaRegistry roster index
range around CMD() in startDataNode and binds the new SR to both
the host-prefixed addr and the nodeHost-prefixed nodeAddr so
callers can use either form.
- banyand/metadata/schema/property.NewSchemaRegistryClient now
registers in the watch-control roster only on success — pre-error
registration leaked orphan entries that the test harness mistook
for the working SR.
- SchemaRegistry.pauseQueue switches from []*WatchSchemasResponse to
[]func() so the queued work can capture the parsed entry-point
args (kind / prop / spec / propID / cacheEntry / revision) instead
of re-parsing on replay.
Unit tests in banyand/metadata/schema/property/watch_control_test.go
update to reflect the new gate location: queue length grows on
processInitialResourceFromProperty + handleDeletion under pause; resume
drains in arrival order.
Authoring §6.12a/b/c/d distributed integration specs is deferred —
the in-process distributed harness does not currently expose
NodeSchemaStatusService on data-node ports (data.go's pipeline never
calls SetMetadataRepo, unlike liaison.go), and the cluster barrier on
the liaison does not list data nodes in tier2 of its route table for
fan-out. Fixing either is a separate architectural piece, not a CP-5
deliverable; the §6.12 spec contracts are documented in
.omc/plans/banyandb-schema-tbd-plan.md and can be authored once the
data-node service exposure lands.
via [HAPI](https://hapi.run)
Co-Authored-By: HAPI <[email protected]>
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
banyand/metadata/schema/property/client.go | 60 +++++++----
banyand/metadata/schema/property/watch_control.go | 4 +-
.../metadata/schema/property/watch_control_test.go | 116 +++++++++------------
pkg/test/helpers/constant.go | 11 +-
pkg/test/setup/setup.go | 23 ++--
test/integration/distributed/schema/common.go | 21 ++--
6 files changed, 126 insertions(+), 109 deletions(-)
diff --git a/banyand/metadata/schema/property/client.go
b/banyand/metadata/schema/property/client.go
index d5385a613..b7ec87069 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -130,7 +130,7 @@ type SchemaRegistry struct {
caCertReloader *pkgtls.Reloader
handlers map[schema.Kind][]schema.EventHandler
watchSessions map[string]*watchSession
- pauseQueue []*schemav1.WatchSchemasResponse
+ pauseQueue []func()
syncInterval time.Duration
syncTimeout time.Duration
watchMaxBackoff time.Duration
@@ -214,7 +214,6 @@ func NewSchemaRegistryClient(cfg *ClientConfig)
(*SchemaRegistry, error) {
fullReconcileEvery: fullReconcileEvery,
}
handler.registry = reg
- registerForWatchControl(reg)
if cfg.CurNode != nil && isPropertySchemaNode(cfg.CurNode) {
connMgr.OnAddOrUpdate(cfg.CurNode)
@@ -247,6 +246,12 @@ func NewSchemaRegistryClient(cfg *ClientConfig)
(*SchemaRegistry, error) {
return nil, fmt.Errorf("no schema servers reachable
after %s", initWaitTime)
}
}
+ // Register only on success — failed attempts (e.g. ListNode error,
+ // no reachable schema server) close their reg and return nil. Pre-error
+ // registration would leak orphan entries that the test harness would
+ // then mistake for the working SchemaRegistry, so the watch-control
+ // roster only sees fully-initialized registries.
+ registerForWatchControl(reg)
return reg, nil
}
@@ -1399,19 +1404,6 @@ func (r *SchemaRegistry) handleWatchEvent(resp
*schemav1.WatchSchemasResponse) {
if prop == nil {
return
}
- // Pause gate (test-only): when PauseNotifications has been called on
this
- // registry, queue the event for replay on ResumeNotifications. Used by
- // pkg/test/setup.PauseDataNodeWatch to exercise cluster-only specs
- // (§6.12) that require a single data node falling behind the cluster.
- // Production code never reaches the queue path because
PauseNotifications
- // is only invoked from the test harness.
- r.pauseMu.Lock()
- if r.paused {
- r.pauseQueue = append(r.pauseQueue, resp)
- r.pauseMu.Unlock()
- return
- }
- r.pauseMu.Unlock()
parsed := ParseTags(prop.GetTags())
kindStr := parsed.Kind
if kindStr == "" {
@@ -1615,6 +1607,21 @@ func (r *SchemaRegistry) mergeDigests(allDigests
map[string][]*digestEntry) map[
}
func (r *SchemaRegistry) processInitialResourceFromProperty(kind schema.Kind,
prop *propertyv1.Property, spec proto.Message) {
+ // Pause gate (test-only): when PauseNotifications has been called,
defer
+ // the cache mutation + handler dispatch until ResumeNotifications
drains
+ // the queue. Covers both the watch path and the full-reconcile path —
+ // every event funnels through this function before the schemaCache is
+ // updated and notifiedModRevision advances. Used by
+ // pkg/test/setup.PauseDataNodeWatch for §6.12 partial-cluster specs.
+ r.pauseMu.Lock()
+ if r.paused {
+ r.pauseQueue = append(r.pauseQueue, func() {
+ r.processInitialResourceFromProperty(kind, prop, spec)
+ })
+ r.pauseMu.Unlock()
+ return
+ }
+ r.pauseMu.Unlock()
propID := prop.GetId()
parsed := ParseTags(prop.GetTags())
entry := &cacheEntry{
@@ -1646,6 +1653,18 @@ func (r *SchemaRegistry)
processInitialResourceFromProperty(kind schema.Kind, pr
}
func (r *SchemaRegistry) handleDeletion(kind schema.Kind, propID string, entry
*cacheEntry, revision int64) {
+ // Pause gate: same rationale as processInitialResourceFromProperty.
+ // Skipping cache.Delete keeps the entry visible to GetAbsentKeys so
+ // AwaitSchemaDeleted reports the paused node as still_present_keys.
+ r.pauseMu.Lock()
+ if r.paused {
+ r.pauseQueue = append(r.pauseQueue, func() {
+ r.handleDeletion(kind, propID, entry, revision)
+ })
+ r.pauseMu.Unlock()
+ return
+ }
+ r.pauseMu.Unlock()
r.l.Debug().Stringer("kind", kind).Str("propID",
propID).Int64("revision", revision).
Msg("handleDeletion: attempting to delete from cache")
if r.cache.Delete(propID, revision) {
@@ -1778,9 +1797,10 @@ func (r *SchemaRegistry) PauseNotifications() {
}
// ResumeNotifications resumes watch-event processing and drains the queue
-// accumulated while paused. Events are replayed in arrival order so
-// downstream handlers (entityRepo / schemaRepo) see the same sequence they
-// would have observed if pause had never happened. A no-op when not paused.
+// accumulated while paused. Each queued closure replays the original
+// processInitialResourceFromProperty / handleDeletion call so downstream
+// handlers (entityRepo / schemaRepo) and the schemaCache catch up to the
+// state the schema-server already reflects. A no-op when not paused.
func (r *SchemaRegistry) ResumeNotifications() {
r.pauseMu.Lock()
if !r.paused {
@@ -1791,7 +1811,7 @@ func (r *SchemaRegistry) ResumeNotifications() {
queue := r.pauseQueue
r.pauseQueue = nil
r.pauseMu.Unlock()
- for _, evt := range queue {
- r.handleWatchEvent(evt)
+ for _, replay := range queue {
+ replay()
}
}
diff --git a/banyand/metadata/schema/property/watch_control.go
b/banyand/metadata/schema/property/watch_control.go
index 7fc627a3d..83b40036d 100644
--- a/banyand/metadata/schema/property/watch_control.go
+++ b/banyand/metadata/schema/property/watch_control.go
@@ -35,8 +35,8 @@ var (
schemaRegistryRoster []*SchemaRegistry
)
-// registerForWatchControl appends r to the per-process roster and returns
-// its index. Called from NewSchemaRegistryClient. Thread-safe.
+// registerForWatchControl appends r to the per-process roster. Called from
+// NewSchemaRegistryClient. Thread-safe.
func registerForWatchControl(r *SchemaRegistry) {
schemaRegistryRosterMu.Lock()
schemaRegistryRoster = append(schemaRegistryRoster, r)
diff --git a/banyand/metadata/schema/property/watch_control_test.go
b/banyand/metadata/schema/property/watch_control_test.go
index 0d38400d3..f20e9bd2c 100644
--- a/banyand/metadata/schema/property/watch_control_test.go
+++ b/banyand/metadata/schema/property/watch_control_test.go
@@ -24,73 +24,53 @@ import (
"github.com/stretchr/testify/require"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
- schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
// newTestSchemaRegistry returns a minimally-initialized SchemaRegistry
-// suitable for the gate tests below — only the logger is required so the
-// post-resume dispatch tail (ParseTags / KindFromString warnings) does
-// not panic on nil logger access. The connection manager / cache /
-// handler map stay zero-valued because these tests only exercise the
-// pause / queue / drain paths, which short-circuit before any of those
-// fields are touched.
+// suitable for the gate tests below. Only the logger is required because
+// each test exercises an early-return path: the pause gate fires before
+// any code that would touch the schemaCache or handler map.
func newTestSchemaRegistry() *SchemaRegistry {
return &SchemaRegistry{l: logger.GetLogger("test-watch-control")}
}
-// fakeWatchEvent returns a minimally non-nil WatchSchemasResponse so the
-// gate path runs to completion. The event payload is intentionally bare —
-// these tests exercise the pause / queue / drain mechanism, not the
-// downstream parse / dispatch path.
-func fakeWatchEvent(id string) *schemav1.WatchSchemasResponse {
- return &schemav1.WatchSchemasResponse{
- Property: &propertyv1.Property{Id: id},
- }
-}
-
-// TestPauseNotifications_QueuesEventsWhilePaused verifies that
-// handleWatchEvent appends to pauseQueue (and does not invoke the parse /
-// dispatch tail) while paused=true.
-func TestPauseNotifications_QueuesEventsWhilePaused(t *testing.T) {
+// TestPauseNotifications_FlipsFlag verifies that PauseNotifications sets
+// the internal paused flag (the only state the test harness inspects).
+func TestPauseNotifications_FlipsFlag(t *testing.T) {
r := newTestSchemaRegistry()
r.PauseNotifications()
-
- r.handleWatchEvent(fakeWatchEvent("a"))
- r.handleWatchEvent(fakeWatchEvent("b"))
- r.handleWatchEvent(fakeWatchEvent("c"))
-
r.pauseMu.Lock()
- queueLen := len(r.pauseQueue)
- pausedFlag := r.paused
- r.pauseMu.Unlock()
-
- assert.True(t, pausedFlag, "PauseNotifications must flip paused=true")
- require.Equal(t, 3, queueLen, "all three events must be queued while
paused")
- assert.Equal(t, "a", r.pauseQueue[0].GetProperty().GetId())
- assert.Equal(t, "c", r.pauseQueue[2].GetProperty().GetId())
+ defer r.pauseMu.Unlock()
+ assert.True(t, r.paused, "PauseNotifications must flip paused=true")
}
-// TestResumeNotifications_DrainsQueueAndClearsPauseState verifies that
-// ResumeNotifications flips paused=false, replays the queued events
-// (handleWatchEvent runs to completion for each), and leaves an empty
-// queue. Replay correctness for a malformed payload is out of scope —
-// handleWatchEvent's existing nil/parse guards already cover that.
-func TestResumeNotifications_DrainsQueueAndClearsPauseState(t *testing.T) {
+// TestResumeNotifications_ClearsFlagAndDrainsQueue verifies that
+// ResumeNotifications flips paused=false and drains every queued replay
+// closure in arrival order.
+func TestResumeNotifications_ClearsFlagAndDrainsQueue(t *testing.T) {
r := newTestSchemaRegistry()
r.PauseNotifications()
- r.handleWatchEvent(fakeWatchEvent("a"))
- r.handleWatchEvent(fakeWatchEvent("b"))
+
+ var replays []int
+ r.pauseMu.Lock()
+ for i := 0; i < 3; i++ {
+ idx := i
+ r.pauseQueue = append(r.pauseQueue, func() { replays =
append(replays, idx) })
+ }
+ r.pauseMu.Unlock()
r.ResumeNotifications()
r.pauseMu.Lock()
- queueLen := len(r.pauseQueue)
pausedFlag := r.paused
+ queueLen := len(r.pauseQueue)
r.pauseMu.Unlock()
assert.False(t, pausedFlag, "ResumeNotifications must flip
paused=false")
assert.Equal(t, 0, queueLen, "ResumeNotifications must drain the queue")
+ assert.Equal(t, []int{0, 1, 2}, replays, "drained closures must run in
arrival order")
}
// TestResumeNotifications_NotPaused_NoOp verifies that calling
@@ -101,45 +81,43 @@ func TestResumeNotifications_NotPaused_NoOp(t *testing.T) {
r.ResumeNotifications()
r.pauseMu.Lock()
- queueLen := len(r.pauseQueue)
- pausedFlag := r.paused
- r.pauseMu.Unlock()
-
- assert.False(t, pausedFlag)
- assert.Equal(t, 0, queueLen)
+ defer r.pauseMu.Unlock()
+ assert.False(t, r.paused)
+ assert.Equal(t, 0, len(r.pauseQueue))
}
-// TestHandleWatchEvent_AfterResume_RunsImmediately verifies that events
-// arriving after ResumeNotifications skip the queue path and run through
-// the dispatch tail synchronously, matching pre-pause behavior.
-func TestHandleWatchEvent_AfterResume_RunsImmediately(t *testing.T) {
+// TestProcessInitialResourceFromProperty_PausedQueuesAndSkipsCacheUpdate
+// verifies the load-bearing gate: when paused,
processInitialResourceFromProperty
+// returns early so the schemaCache (nil here, would panic on access) is
+// untouched and the closure is queued for replay.
+func TestProcessInitialResourceFromProperty_PausedQueuesAndSkipsCacheUpdate(t
*testing.T) {
r := newTestSchemaRegistry()
r.PauseNotifications()
- r.ResumeNotifications()
- r.handleWatchEvent(fakeWatchEvent("after-resume"))
+ prop := &propertyv1.Property{Id: "queued"}
+ // nil cache would panic if the gate did not short-circuit here.
+ r.processInitialResourceFromProperty(schema.KindMeasure, prop, nil)
r.pauseMu.Lock()
- queueLen := len(r.pauseQueue)
- r.pauseMu.Unlock()
-
- assert.Equal(t, 0, queueLen, "post-resume events must not be queued")
+ defer r.pauseMu.Unlock()
+ require.Equal(t, 1, len(r.pauseQueue),
+ "paused processInitialResourceFromProperty must queue a replay
closure")
}
-// TestPauseNotifications_NilPropertyShortCircuits verifies that the
-// existing prop==nil guard short-circuits before the gate, so a nil
-// payload does not pollute the queue.
-func TestPauseNotifications_NilPropertyShortCircuits(t *testing.T) {
+// TestHandleDeletion_PausedQueuesAndSkipsCacheDelete verifies the
+// counterpart gate on handleDeletion. cache.Delete is never invoked under
+// pause (the cache is nil here), so the entry remains visible to
+// GetAbsentKeys — exactly the behavior §6.12c relies on.
+func TestHandleDeletion_PausedQueuesAndSkipsCacheDelete(t *testing.T) {
r := newTestSchemaRegistry()
r.PauseNotifications()
- r.handleWatchEvent(&schemav1.WatchSchemasResponse{Property: nil})
+ r.handleDeletion(schema.KindStream, "p_g/s", &cacheEntry{}, 1)
r.pauseMu.Lock()
- queueLen := len(r.pauseQueue)
- r.pauseMu.Unlock()
-
- assert.Equal(t, 0, queueLen, "nil-property events must not enter the
pause queue")
+ defer r.pauseMu.Unlock()
+ require.Equal(t, 1, len(r.pauseQueue),
+ "paused handleDeletion must queue a replay closure")
}
// TestSchemaRegistryRoster_RegisterAndIndex verifies that
@@ -150,7 +128,7 @@ func TestSchemaRegistryRoster_RegisterAndIndex(t
*testing.T) {
before := CountSchemaRegistries()
r := newTestSchemaRegistry()
registerForWatchControl(r)
- r2 := &SchemaRegistry{}
+ r2 := newTestSchemaRegistry()
registerForWatchControl(r2)
require.Equal(t, before+2, CountSchemaRegistries())
diff --git a/pkg/test/helpers/constant.go b/pkg/test/helpers/constant.go
index cced3618d..4e8964842 100644
--- a/pkg/test/helpers/constant.go
+++ b/pkg/test/helpers/constant.go
@@ -42,10 +42,19 @@ const (
)
// SharedContext is the context shared between test cases in the integration
testing.
+//
+//nolint:govet // fieldalignment cannot reduce the pointer-byte run further;
readability beats the 8-byte saving for a test-only struct.
type SharedContext struct {
BaseTime time.Time
Connection *grpclib.ClientConn
- Mode string
+ // DataNodeAddrs lists the gRPC addresses of the cluster's data nodes.
+ // Populated by the distributed BeforeSuite; nil for standalone runs.
+ // Cluster-only specs (§6.12 partial-cluster barrier scenarios) pass
+ // entries from this slice to setup.PauseDataNodeWatch /
+ // ResumeDataNodeWatch to drive a single node out of sync while the
+ // rest of the cluster stays caught up.
+ DataNodeAddrs []string
+ Mode string
}
// Args is a wrapper seals all necessary info for table specs.
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index bf9ab7456..2e3fc7e2a 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -607,16 +607,20 @@ func startDataNode(config *ClusterConfig, dataDir string,
flags ...string) (stri
config.NodeDiscovery.FileWriter.AddNode(nodeAddr, nodeAddr)
}
- // Bind the data node's SchemaRegistry to its gRPC address so the test
- // harness can call PauseDataNodeWatch / ResumeDataNodeWatch later. The
- // metadata.clientService's PreRun creates exactly one SchemaRegistry,
- // and the health check above guarantees PreRun has completed before we
- // reach here — so taking the most recently registered roster slot is
- // safe.
+ // Bind the data node's SchemaRegistry to both the host-prefixed addr
+ // returned to the caller and the nodeHost-prefixed nodeAddr the route
+ // table uses internally, so PauseDataNodeWatch / ResumeDataNodeWatch
+ // accept either form. The metadata.clientService's PreRun creates
+ // exactly one SchemaRegistry, and the health check above guarantees
+ // PreRun has completed — so the most recently registered roster slot
+ // is the right one for this node.
afterCount := property.CountSchemaRegistries()
if afterCount > beforeCount {
if reg := property.SchemaRegistryByIndex(afterCount - 1); reg
!= nil {
- bindNodeWatchControl(nodeAddr, reg)
+ bindNodeWatchControl(addr, reg)
+ if nodeAddr != addr {
+ bindNodeWatchControl(nodeAddr, reg)
+ }
}
}
@@ -624,7 +628,10 @@ func startDataNode(config *ClusterConfig, dataDir string,
flags ...string) (stri
if config.NodeDiscovery.FileWriter != nil {
config.NodeDiscovery.FileWriter.RemoveNode(nodeAddr)
}
- unbindNodeWatchControl(nodeAddr)
+ unbindNodeWatchControl(addr)
+ if nodeAddr != addr {
+ unbindNodeWatchControl(nodeAddr)
+ }
rawCloseFn()
}
diff --git a/test/integration/distributed/schema/common.go
b/test/integration/distributed/schema/common.go
index b1a2f7fe7..8869c7526 100644
--- a/test/integration/distributed/schema/common.go
+++ b/test/integration/distributed/schema/common.go
@@ -44,10 +44,11 @@ import (
)
var (
- now time.Time
- stopFunc func()
- connection *grpc.ClientConn
- goods []gleak.Goroutine
+ now time.Time
+ stopFunc func()
+ connection *grpc.ClientConn
+ goods []gleak.Goroutine
+ dataNodeAddrs []string
)
var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
@@ -62,9 +63,10 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
config := setup.PropertyClusterConfig(dfWriter)
ginkgo.By("Starting data node 0")
- closeDataNode0 := setup.DataNode(config)
+ addrDataNode0, _, closeDataNode0 := setup.DataNodeWithAddrAndDir(config)
ginkgo.By("Starting data node 1")
- closeDataNode1 := setup.DataNode(config)
+ addrDataNode1, _, closeDataNode1 := setup.DataNodeWithAddrAndDir(config)
+ dataNodeAddrs = []string{addrDataNode0, addrDataNode1}
ginkgo.By("Loading schema via property")
setup.PreloadSchemaViaProperty(config, test_stream.PreloadSchema,
test_measure.PreloadSchema, test_trace.PreloadSchema)
config.AddLoadedKinds(schemapkg.KindStream, schemapkg.KindMeasure,
schemapkg.KindTrace)
@@ -87,9 +89,10 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
connection, err = grpchelper.Conn(string(address), 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
casesschema.SharedContext = helpers.SharedContext{
- Connection: connection,
- Mode: "distributed",
- BaseTime: now,
+ Connection: connection,
+ Mode: helpers.ModeDistributed,
+ BaseTime: now,
+ DataNodeAddrs: dataNodeAddrs,
}
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})