This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/phase-2-cp5-march by this push:
new 833c6f61e fix(schema): return actual modRevision on no-op Update RPCs
833c6f61e is described below
commit 833c6f61e0ff736232fd2851b01065cd8bfcb185
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 8 04:46:38 2026 +0000
fix(schema): return actual modRevision on no-op Update RPCs
When updateResource detected unchanged content via CheckerMap, it
short-circuited without writing to the property store — but the caller
had already fabricated modRevision = time.Now().UnixNano() and returned
it. That revision never appeared in property watch events, causing
AwaitRevisionApplied(R) to hang forever.
Change updateResource to return (int64, error): the existing property's
modRevision for no-op updates, the new revision for real updates. All
Update* callers now return the actual revision the barrier can observe.
via [HAPI](https://hapi.run)
Co-Authored-By: HAPI <[email protected]>
---
CHANGES.md | 1 +
banyand/metadata/schema/property/client.go | 48 +++++++++++++++++-------------
test/cases/schema/barrier_cluster.go | 26 +++++++---------
3 files changed, 39 insertions(+), 36 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 8a8a10dd8..28bb0cfcb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -50,6 +50,7 @@ Release Notes.
- Expose `cluster.v1.NodeSchemaStatusService` on data-node gRPC ports.
Decouple the registration in `banyand/queue/sub/server.go`'s `Serve()` so
`fodc.v1.GroupLifecycleService` (liaison-only by design) and
`NodeSchemaStatusService` (per-node by design) are gated independently: the new
`queue.Server.SetNodeSchemaStatusRepo(metadata.Service)` setter wires the
per-node service without dragging along the liaison-shaped
`GroupLifecycleService`. Liaison startup (`pkg/cmdsetup/liaison.go`) ca [...]
- Repair the `GetMaxRevision` aggregation on the per-node
`NodeSchemaStatusService` (`banyand/metadata/schema/property/node_status.go`).
The previous implementation returned `min(schemaCache.notifiedModRevision,
NodeRepoRegistry.LatestModRevision)`, but `LatestModRevision` aggregated
per-service `schemaRepo` watermarks via `min` — and each `schemaRepo` only
advances on events for its own catalog (`pkg/schema/init.go:72` filters by
`g.Catalog`), so the min was perpetually pinned to the [...]
- Fix the `notifiedModRevision` watermark advancement in
`SchemaRegistry.processInitialResourceFromProperty`, `handleWatchEvent` (DELETE
branch), and `handleDeletion`. Previously `AdvanceNotified` was gated on
`cache.Update` / `cache.Delete` returning true, but those methods compare
`latestUpdateAt` (property timestamp) while the watermark tracks `modRevision`
(etcd revision). When the property timestamp is stale (e.g. a no-op Update that
doesn't change the measure spec), the cache rej [...]
+ - Fix the `modRevision` contract on no-op Update RPCs
(`MeasureRegistryService.Update`, etc.). Previously `updateResource` detected
unchanged content via `CheckerMap` and short-circuited without writing to the
property store, but the caller had already fabricated `modRevision =
time.Now().UnixNano()` and returned it. The returned revision never appeared in
the property watch stream, so `AwaitRevisionApplied(R)` would hang.
`updateResource` now returns `(int64, error)` — the existing pr [...]
### Bug Fixes
diff --git a/banyand/metadata/schema/property/client.go
b/banyand/metadata/schema/property/client.go
index dd38c663f..0ffa6fb25 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -666,32 +666,32 @@ func createResource[T proto.Message](ctx context.Context,
r *SchemaRegistry,
func updateResource[T proto.Message](ctx context.Context, r *SchemaRegistry,
kind schema.Kind, spec T, validators ...func(prev T) error,
-) error {
+) (int64, error) {
metadata, metaErr := getMetadataFromSpec(kind, spec)
if metaErr != nil {
- return metaErr
+ return 0, metaErr
}
if validateErr := r.validateGroup(ctx, kind, metadata.GetGroup());
validateErr != nil {
- return validateErr
+ return 0, validateErr
}
originalProp, getErr := r.getSchema(ctx, kind, metadata.GetGroup(),
metadata.GetName())
if getErr != nil {
- return getErr
+ return 0, getErr
}
if originalProp == nil {
- return fmt.Errorf("schema %s/%s not exist",
metadata.GetGroup(), metadata.GetName())
+ return 0, fmt.Errorf("schema %s/%s not exist",
metadata.GetGroup(), metadata.GetName())
}
prevMd, convErr := ToSchema(kind, originalProp)
if convErr != nil {
- return convErr
+ return 0, convErr
}
prev, ok := prevMd.Spec.(T)
if !ok {
- return fmt.Errorf("unexpected spec type for kind %s", kind)
+ return 0, fmt.Errorf("unexpected spec type for kind %s", kind)
}
for _, v := range validators {
if validateErr := v(prev); validateErr != nil {
- return validateErr
+ return 0, validateErr
}
}
// Preserve created_at from the stored schema; the caller must not
override it.
@@ -699,16 +699,23 @@ func updateResource[T proto.Message](ctx context.Context,
r *SchemaRegistry,
setCreatedAtOnSpec(kind, spec, prevCreatedAt)
}
if checker, checkerOk := schema.CheckerMap[kind]; checkerOk &&
checker(prev, spec) {
- return nil
+ // No-op update: content unchanged. Return the existing
property's
+ // modRevision so callers observe the revision the barrier will
see.
+ // The caller's fabricated modRevision was never written to the
property
+ // store, so returning it would cause AwaitRevisionApplied to
hang.
+ return originalProp.GetMetadata().GetModRevision(), nil
}
prop, propErr := SchemaToProperty(kind, spec)
if propErr != nil {
- return propErr
+ return 0, propErr
}
- return r.broadcastAll(func(_ string, c *schemaClient) error {
+ if broadcastErr := r.broadcastAll(func(_ string, c *schemaClient) error
{
_, rpcErr := c.management.UpdateSchema(ctx,
&schemav1.UpdateSchemaRequest{Property: prop})
return rpcErr
- })
+ }); broadcastErr != nil {
+ return 0, broadcastErr
+ }
+ return metadata.GetModRevision(), nil
}
// GetStream retrieves a stream schema.
@@ -740,7 +747,7 @@ func (r *SchemaRegistry) UpdateStream(ctx context.Context,
stream *databasev1.St
now := time.Now().UnixNano()
stream.Metadata.ModRevision = now
stream.UpdatedAt = timestamppb.Now()
- return now, updateResource(ctx, r, schema.KindStream, stream, func(prev
*databasev1.Stream) error {
+ return updateResource(ctx, r, schema.KindStream, stream, func(prev
*databasev1.Stream) error {
if err := validateStreamUpdate(prev, stream); err != nil {
return fmt.Errorf("validation failed: %w", err)
}
@@ -792,7 +799,7 @@ func (r *SchemaRegistry) UpdateMeasure(ctx context.Context,
measure *databasev1.
now := time.Now().UnixNano()
measure.Metadata.ModRevision = now
measure.UpdatedAt = timestamppb.Now()
- return now, updateResource(ctx, r, schema.KindMeasure, measure,
func(prev *databasev1.Measure) error {
+ return updateResource(ctx, r, schema.KindMeasure, measure, func(prev
*databasev1.Measure) error {
if err := validateMeasureUpdate(prev, measure); err != nil {
return fmt.Errorf("validation failed: %w", err)
}
@@ -849,7 +856,7 @@ func (r *SchemaRegistry) UpdateTrace(ctx context.Context,
trace *databasev1.Trac
now := time.Now().UnixNano()
trace.Metadata.ModRevision = now
trace.UpdatedAt = timestamppb.Now()
- return now, updateResource(ctx, r, schema.KindTrace, trace, func(prev
*databasev1.Trace) error {
+ return updateResource(ctx, r, schema.KindTrace, trace, func(prev
*databasev1.Trace) error {
if err := validate.TraceUpdate(prev, trace); err != nil {
return fmt.Errorf("validation failed: %w", err)
}
@@ -891,7 +898,7 @@ func (r *SchemaRegistry) UpdateGroup(ctx context.Context,
group *commonv1.Group)
now := time.Now().UnixNano()
group.Metadata.ModRevision = now
group.UpdatedAt = timestamppb.Now()
- return now, updateResource(ctx, r, schema.KindGroup, group)
+ return updateResource(ctx, r, schema.KindGroup, group)
}
// DeleteGroup deletes a group and all its resources.
@@ -969,7 +976,7 @@ func (r *SchemaRegistry) UpdateIndexRule(ctx
context.Context, indexRule *databas
now := time.Now().UnixNano()
indexRule.Metadata.ModRevision = now
indexRule.UpdatedAt = timestamppb.Now()
- return now, updateResource(ctx, r, schema.KindIndexRule, indexRule)
+ return updateResource(ctx, r, schema.KindIndexRule, indexRule)
}
// DeleteIndexRule deletes an index rule schema.
@@ -1006,7 +1013,7 @@ func (r *SchemaRegistry) UpdateIndexRuleBinding(ctx
context.Context, indexRuleBi
now := time.Now().UnixNano()
indexRuleBinding.Metadata.ModRevision = now
indexRuleBinding.UpdatedAt = timestamppb.Now()
- return now, updateResource(ctx, r, schema.KindIndexRuleBinding,
indexRuleBinding)
+ return updateResource(ctx, r, schema.KindIndexRuleBinding,
indexRuleBinding)
}
// DeleteIndexRuleBinding deletes an index rule binding schema.
@@ -1043,7 +1050,7 @@ func (r *SchemaRegistry) UpdateTopNAggregation(ctx
context.Context, topN *databa
now := time.Now().UnixNano()
topN.Metadata.ModRevision = now
topN.UpdatedAt = timestamppb.Now()
- return now, updateResource(ctx, r, schema.KindTopNAggregation, topN)
+ return updateResource(ctx, r, schema.KindTopNAggregation, topN)
}
// DeleteTopNAggregation deletes a TopN aggregation schema.
@@ -1078,7 +1085,8 @@ func (r *SchemaRegistry) UpdateProperty(ctx
context.Context, property *databasev
}
now := time.Now().UnixNano()
property.Metadata.ModRevision = now
- return updateResource(ctx, r, schema.KindProperty, property)
+ _, updateErr := updateResource(ctx, r, schema.KindProperty, property)
+ return updateErr
}
// DeleteProperty deletes a property schema.
diff --git a/test/cases/schema/barrier_cluster.go
b/test/cases/schema/barrier_cluster.go
index a3c2a3ab5..87697cfa5 100644
--- a/test/cases/schema/barrier_cluster.go
+++ b/test/cases/schema/barrier_cluster.go
@@ -130,12 +130,6 @@ var _ = g.Describe("Cluster barrier under partial-cluster
conditions (§6.12)",
Metadata: &commonv1.Metadata{Group: groupName, Name:
measureName},
})
gm.Expect(getErr).ShouldNot(gm.HaveOccurred())
- // Add a new tag to force a real content change so the property
- // system creates a new property revision. A no-op Get→Update
- // bumps the metadata server's etcd revision but the property
- // store may not create a new revision for unchanged content,
- // leaving AwaitRevisionApplied waiting for a revision that
never
- // arrives in the watch stream.
measure := getResp.GetMeasure()
measure.TagFamilies[0].Tags =
append(measure.TagFamilies[0].Tags,
&databasev1.TagSpec{Name: "region", Type:
databasev1.TagType_TAG_TYPE_STRING})
@@ -144,7 +138,7 @@ var _ = g.Describe("Cluster barrier under partial-cluster
conditions (§6.12)",
newRev := updResp.GetModRevision()
gm.Expect(newRev).Should(gm.BeNumerically(">", baselineRev))
- g.By("Calling AwaitRevisionApplied — paused liaison must
surface as a laggard")
+ g.By("Calling awaitRevisionApplied — paused liaison must
surface as a laggard")
// Brief settle so the bumped revision's watch event has time to
// reach the paused liaison's SR (which queues it under pause).
// Without this, the barrier can race the watch broadcast.
@@ -300,15 +294,15 @@ var _ = g.Describe("Cluster barrier under partial-cluster
conditions (§6.12)",
Metadata: &commonv1.Metadata{Group: groupName, Name:
measureName},
})
gm.Expect(getErr).ShouldNot(gm.HaveOccurred())
- measure := getResp.GetMeasure()
- measure.TagFamilies[0].Tags =
append(measure.TagFamilies[0].Tags,
- &databasev1.TagSpec{Name: "region", Type:
databasev1.TagType_TAG_TYPE_STRING})
- _, firstErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
- gm.Expect(firstErr).ShouldNot(gm.HaveOccurred())
- measure.TagFamilies[0].Tags =
append(measure.TagFamilies[0].Tags,
- &databasev1.TagSpec{Name: "zone", Type:
databasev1.TagType_TAG_TYPE_STRING})
- secondResp, secondErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
- gm.Expect(secondErr).ShouldNot(gm.HaveOccurred())
+ measure := getResp.GetMeasure()
+ measure.TagFamilies[0].Tags =
append(measure.TagFamilies[0].Tags,
+ &databasev1.TagSpec{Name: "region", Type:
databasev1.TagType_TAG_TYPE_STRING})
+ _, firstErr := clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
+ gm.Expect(firstErr).ShouldNot(gm.HaveOccurred())
+ measure.TagFamilies[0].Tags =
append(measure.TagFamilies[0].Tags,
+ &databasev1.TagSpec{Name: "zone", Type:
databasev1.TagType_TAG_TYPE_STRING})
+ secondResp, secondErr :=
clients.MeasureRegClient.Update(ctx,
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
+ gm.Expect(secondErr).ShouldNot(gm.HaveOccurred())
finalRev := secondResp.GetModRevision()
g.By("Verifying the barrier reports the paused liaison before
resume")