This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/phase-2-cp5-march by this push:
new 3bcc61d1f fix(barrier): advance notifiedModRevision unconditionally on
event processing (§6.12a/d)
3bcc61d1f is described below
commit 3bcc61d1f03baaba0f0063cc5ca258af3a42c331
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 8 02:12:27 2026 +0000
fix(barrier): advance notifiedModRevision unconditionally on event
processing (§6.12a/d)
The notifiedModRevision watermark was gated on cache.Update/cache.Delete
returning true, but those methods compare latestUpdateAt (property
timestamp) while the watermark tracks modRevision (etcd revision). When
the property timestamp is stale the watermark cannot advance, causing
AwaitRevisionApplied to block forever.
Move AdvanceNotified outside the if-updated/if-deleted blocks in
processInitialResourceFromProperty, handleWatchEvent (DELETE), and
handleDeletion. Enable §6.12a/d integration specs (g.PIt → g.It) and
make the measure update mutate actual content so the property store
generates a new revision.
Distributed schema integration: 32 Passed | 0 Failed | 0 Pending.
via [HAPI](https://hapi.run)
Co-Authored-By: HAPI <[email protected]>
---
CHANGES.md | 3 +-
banyand/metadata/schema/property/client.go | 25 ++++++-------
test/cases/schema/barrier_cluster.go | 56 ++++++++++++++++++------------
3 files changed, 48 insertions(+), 36 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index db242ee10..8a8a10dd8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -46,9 +46,10 @@ Release Notes.
- Add the schema-barrier CP-6 SLO load harness (Step 2.8) under
`test/load/schema_barrier/`, runnable via `make load-test-barrier`. The harness
brings up an in-process 3 data node + 1 liaison cluster, drives 100 concurrent
`AwaitRevisionApplied` callers + 10 `Group.Update` ops/sec, and reports p50 /
p95 / p99 / max from client-side per-call duration after a 1-minute warm-up +
5-minute measurement window. Client-side latency is bounded above by the
server-side histogram so the SLO check [...]
- Land `pkg/test/setup.PauseDataNodeWatch` / `ResumeDataNodeWatch` (Step 1.0
follow-up): the helpers replace the `ErrWatchControlNotImplemented` stub with a
working hook into `property.SchemaRegistry` so cluster-only specs can drive a
single data node to fall behind the cluster while the rest stays in sync. The
data node's `handleWatchEvent`, `processInitialResourceFromProperty`, and
`handleDeletion` paths each gate events into a per-registry queue while paused;
resume drains the queue [...]
- Extend the watch-control binding to liaison processes
(`pkg/test/setup.startLiaisonNode`) and add `helpers.SharedContext.LiaisonAddr`
so cluster-only specs can pause the receiving liaison's own `SchemaRegistry`.
The cluster barrier's `selfName` probe reads through that SR, so pausing it
surfaces a laggard via the public `AwaitX` RPCs.
- - Author §6.12 cluster-barrier integration specs
(`test/cases/schema/barrier_cluster.go`): §6.12b (`AwaitSchemaApplied`) and
§6.12c (`AwaitSchemaDeleted`) pin the public-API contract that a paused
receiving liaison surfaces a non-empty `laggards` list and that resume drains
the queue so the per-key barrier converges. §6.12a (`AwaitRevisionApplied`) and
§6.12d (cross-barrier recovery) are checked in as `PIt` (pending): the
laggard-detection assertion passes but the post-resume `AwaitRev [...]
+ - Author §6.12 cluster-barrier integration specs
(`test/cases/schema/barrier_cluster.go`): all four specs now pass end-to-end.
§6.12b (`AwaitSchemaApplied`) and §6.12c (`AwaitSchemaDeleted`) pin the per-key
contract that a paused receiving liaison surfaces a non-empty `laggards` list
and that resume drains the queue so the barrier converges. §6.12a
(`AwaitRevisionApplied`) and §6.12d (cross-barrier recovery) pin the global
watermark contract: the barrier's `GetMaxModRevision` advances [...]
- Expose `cluster.v1.NodeSchemaStatusService` on data-node gRPC ports.
Decouple the registration in `banyand/queue/sub/server.go`'s `Serve()` so
`fodc.v1.GroupLifecycleService` (liaison-only by design) and
`NodeSchemaStatusService` (per-node by design) are gated independently: the new
`queue.Server.SetNodeSchemaStatusRepo(metadata.Service)` setter wires the
per-node service without dragging along the liaison-shaped
`GroupLifecycleService`. Liaison startup (`pkg/cmdsetup/liaison.go`) ca [...]
- Repair the `GetMaxRevision` aggregation on the per-node
`NodeSchemaStatusService` (`banyand/metadata/schema/property/node_status.go`).
The previous implementation returned `min(schemaCache.notifiedModRevision,
NodeRepoRegistry.LatestModRevision)`, but `LatestModRevision` aggregated
per-service `schemaRepo` watermarks via `min` — and each `schemaRepo` only
advances on events for its own catalog (`pkg/schema/init.go:72` filters by
`g.Catalog`), so the min was perpetually pinned to the [...]
+ - Fix the `notifiedModRevision` watermark advancement in
`SchemaRegistry.processInitialResourceFromProperty`, `handleWatchEvent` (DELETE
branch), and `handleDeletion`. Previously `AdvanceNotified` was gated on
`cache.Update` / `cache.Delete` returning true, but those methods compare
`latestUpdateAt` (property timestamp) while the watermark tracks `modRevision`
(etcd revision). When the property timestamp is stale (e.g. a no-op Update that
doesn't change the measure spec), the cache rej [...]
### Bug Fixes
diff --git a/banyand/metadata/schema/property/client.go
b/banyand/metadata/schema/property/client.go
index 35f854432..dd38c663f 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -1441,17 +1441,20 @@ func (r *SchemaRegistry) handleWatchEvent(resp
*schemav1.WatchSchemasResponse) {
r.processInitialResourceFromProperty(kind, prop,
md.Spec.(proto.Message))
case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_DELETE:
propID := prop.GetId()
- if r.cache.Delete(propID, resp.GetDeleteTime()) {
+ deleted := r.cache.Delete(propID, resp.GetDeleteTime())
+ if deleted {
md, convErr := ToSchema(kind, prop)
if convErr != nil {
r.l.Warn().Err(convErr).Stringer("kind",
kind).Msg("watch: failed to convert deleted property")
return
}
r.notifyHandlers(kind, md, true)
- // Advance the barrier watermark past the delete
event's mod_revision after handlers
- // have processed it, keeping the barrier coherent with
downstream caches.
-
r.cache.AdvanceNotified(prop.GetMetadata().GetModRevision())
}
+ // Advance the barrier watermark past the delete event's
mod_revision regardless
+ // of whether the cache entry was actually removed. Mirrors the
+ // processInitialResourceFromProperty fix: the watermark tracks
modRevision
+ // (etcd revision), not cache mutation outcome.
+ r.cache.AdvanceNotified(prop.GetMetadata().GetModRevision())
case schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_REPLAY_DONE,
schemav1.SchemaEventType_SCHEMA_EVENT_TYPE_UNSPECIFIED:
// handled by processWatchSession, not expected here
}
@@ -1659,12 +1662,8 @@ func (r *SchemaRegistry)
processInitialResourceFromProperty(kind schema.Kind, pr
},
Spec: spec,
}, false)
- // Advance the barrier watermark only after every handler
(groupRepo, entityRepo,
- // pkg/schema.schemaRepo, etc.) has been notified. Without this
gate, the barrier
- // could return applied=true while downstream caches still lag,
letting a client
- // issue a query that hits "group not found".
- r.cache.AdvanceNotified(entry.modRevision)
}
+ r.cache.AdvanceNotified(entry.modRevision)
}
func (r *SchemaRegistry) handleDeletion(kind schema.Kind, propID string, entry
*cacheEntry, revision int64) {
@@ -1691,10 +1690,12 @@ func (r *SchemaRegistry) handleDeletion(kind
schema.Kind, propID string, entry *
},
Spec: entry.spec,
}, true)
- // Advance the barrier watermark past the deleted entry's
mod_revision so callers
- // awaiting that revision see the delete once handlers have
processed it.
- r.cache.AdvanceNotified(entry.modRevision)
}
+ // Advance the barrier watermark past the delete event's mod_revision
regardless
+ // of whether the cache entry was actually removed. Mirrors the
+ // processInitialResourceFromProperty fix: the watermark tracks
modRevision
+ // (etcd revision), not cache mutation outcome.
+ r.cache.AdvanceNotified(entry.modRevision)
}
func (r *SchemaRegistry) notifyHandlers(kind schema.Kind, md schema.Metadata,
isDelete bool) {
diff --git a/test/cases/schema/barrier_cluster.go
b/test/cases/schema/barrier_cluster.go
index b7eb0edd2..a3c2a3ab5 100644
--- a/test/cases/schema/barrier_cluster.go
+++ b/test/cases/schema/barrier_cluster.go
@@ -101,16 +101,13 @@ var _ = g.Describe("Cluster barrier under partial-cluster
conditions (§6.12)",
_ = setup.ResumeDataNodeWatch(paused)
})
- // §6.12a — AwaitRevisionApplied surfaces a paused liaison as a
- // laggard via its selfName probe; resume drains the queue and the
- // barrier converges. PENDING: the laggard-detection assertion passes
- // but the post-resume AwaitRevisionApplied(newRev) does not converge
- // inside the spec timeout. The per-key §6.12b/c flows (AwaitApplied /
- // AwaitDeleted) do converge, so this gap is scoped to the global
- // notifiedModRevision watermark advancing through queue replay under
- // the in-process distributed harness — independent of the
- // data-node NodeSchemaStatusService exposure.
- g.PIt("§6.12a AwaitRevisionApplied reports the paused liaison as a
laggard", func() {
+ // §6.12a — AwaitRevisionApplied surfaces a paused liaison as a
+ // laggard via its selfName probe; resume drains the queue and
the
+ // barrier converges. The GetMaxRevision min-aggregation
regression
+ // that caused post-resume laggards:3 timeouts has been repaired
+ // (removed LatestModRevision from NodeRepoRegistry;
GetMaxRevision
+ // now reads cache-only).
+ g.It("§6.12a AwaitRevisionApplied reports the paused liaison as a
laggard", func() {
groupName := fmt.Sprintf("bc-rev-%d", time.Now().UnixNano())
measureName := "bc_rev_measure"
@@ -133,7 +130,16 @@ var _ = g.Describe("Cluster barrier under partial-cluster
conditions (§6.12)",
Metadata: &commonv1.Metadata{Group: groupName, Name:
measureName},
})
gm.Expect(getErr).ShouldNot(gm.HaveOccurred())
- updResp, updErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: getResp.GetMeasure()})
+ // Add a new tag to force a real content change so the property
+ // system creates a new property revision. A no-op Get→Update
+ // bumps the metadata server's etcd revision but the property
+ // store may not create a new revision for unchanged content,
+ // leaving AwaitRevisionApplied waiting for a revision that
never
+ // arrives in the watch stream.
+ measure := getResp.GetMeasure()
+ measure.TagFamilies[0].Tags =
append(measure.TagFamilies[0].Tags,
+ &databasev1.TagSpec{Name: "region", Type:
databasev1.TagType_TAG_TYPE_STRING})
+ updResp, updErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
gm.Expect(updErr).ShouldNot(gm.HaveOccurred())
newRev := updResp.GetModRevision()
gm.Expect(newRev).Should(gm.BeNumerically(">", baselineRev))
@@ -189,7 +195,10 @@ var _ = g.Describe("Cluster barrier under partial-cluster
conditions (§6.12)",
Metadata: &commonv1.Metadata{Group: groupName, Name:
measureName},
})
gm.Expect(getErr).ShouldNot(gm.HaveOccurred())
- updResp, updErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: getResp.GetMeasure()})
+ measure := getResp.GetMeasure()
+ measure.TagFamilies[0].Tags =
append(measure.TagFamilies[0].Tags,
+ &databasev1.TagSpec{Name: "region", Type:
databasev1.TagType_TAG_TYPE_STRING})
+ updResp, updErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
gm.Expect(updErr).ShouldNot(gm.HaveOccurred())
newRev := updResp.GetModRevision()
gm.Expect(newRev).Should(gm.BeNumerically(">", baselineRev))
@@ -265,15 +274,11 @@ var _ = g.Describe("Cluster barrier under partial-cluster
conditions (§6.12)",
_, _ = clients.GroupClient.Delete(ctx,
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
})
- // §6.12d — Cross-barrier recovery: after a multi-step pause-and-mutate
- // sequence, resume drains the queued events in arrival order so a
- // follow-up AwaitRevisionApplied at the post-mutate revision returns
- // applied=true with no laggards. PENDING for the same reason as
- // §6.12a: the global AwaitRevisionApplied watermark does not converge
- // through queue replay inside the spec timeout. §6.12b/c remain the
- // authoritative end-to-end coverage of the queue-drain contract via
- // per-key barriers.
- g.PIt("§6.12d cross-barrier recovery: resume drains queued events and
clears the laggard", func() {
+ // §6.12d — Cross-barrier recovery: after a multi-step
pause-and-mutate
+ // sequence, resume drains the queued events in arrival order
so a
+ // follow-up AwaitRevisionApplied at the post-mutate revision
returns
+ // applied=true with no laggards.
+ g.It("§6.12d cross-barrier recovery: resume drains queued events and
clears the laggard", func() {
groupName := fmt.Sprintf("bc-recovery-%d",
time.Now().UnixNano())
measureName := "bc_recovery_measure"
@@ -295,9 +300,14 @@ var _ = g.Describe("Cluster barrier under partial-cluster
conditions (§6.12)",
Metadata: &commonv1.Metadata{Group: groupName, Name:
measureName},
})
gm.Expect(getErr).ShouldNot(gm.HaveOccurred())
- _, firstErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: getResp.GetMeasure()})
+ measure := getResp.GetMeasure()
+ measure.TagFamilies[0].Tags =
append(measure.TagFamilies[0].Tags,
+ &databasev1.TagSpec{Name: "region", Type:
databasev1.TagType_TAG_TYPE_STRING})
+ _, firstErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
gm.Expect(firstErr).ShouldNot(gm.HaveOccurred())
- secondResp, secondErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: getResp.GetMeasure()})
+ measure.TagFamilies[0].Tags =
append(measure.TagFamilies[0].Tags,
+ &databasev1.TagSpec{Name: "zone", Type:
databasev1.TagType_TAG_TYPE_STRING})
+ secondResp, secondErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
gm.Expect(secondErr).ShouldNot(gm.HaveOccurred())
finalRev := secondResp.GetModRevision()