This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/phase-2-cp5-march by this push:
     new 833c6f61e fix(schema): return actual modRevision on no-op Update RPCs
833c6f61e is described below

commit 833c6f61e0ff736232fd2851b01065cd8bfcb185
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 8 04:46:38 2026 +0000

    fix(schema): return actual modRevision on no-op Update RPCs
    
    When updateResource detected unchanged content via CheckerMap, it
    short-circuited without writing to the property store — but the caller
    had already fabricated modRevision = time.Now().UnixNano() and returned
    it. That revision never appeared in property watch events, causing
    AwaitRevisionApplied(R) to hang forever.
    
    Change updateResource to return (int64, error): the existing property's
    modRevision for no-op updates, the new revision for real updates. All
    Update* callers now return the actual revision the barrier can observe.
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
---
 CHANGES.md                                 |  1 +
 banyand/metadata/schema/property/client.go | 48 +++++++++++++++++-------------
 test/cases/schema/barrier_cluster.go       | 26 +++++++---------
 3 files changed, 39 insertions(+), 36 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 8a8a10dd8..28bb0cfcb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -50,6 +50,7 @@ Release Notes.
   - Expose `cluster.v1.NodeSchemaStatusService` on data-node gRPC ports. 
Decouple the registration in `banyand/queue/sub/server.go`'s `Serve()` so 
`fodc.v1.GroupLifecycleService` (liaison-only by design) and 
`NodeSchemaStatusService` (per-node by design) are gated independently: the new 
`queue.Server.SetNodeSchemaStatusRepo(metadata.Service)` setter wires the 
per-node service without dragging along the liaison-shaped 
`GroupLifecycleService`. Liaison startup (`pkg/cmdsetup/liaison.go`) ca [...]
   - Repair the `GetMaxRevision` aggregation on the per-node 
`NodeSchemaStatusService` (`banyand/metadata/schema/property/node_status.go`). 
The previous implementation returned `min(schemaCache.notifiedModRevision, 
NodeRepoRegistry.LatestModRevision)`, but `LatestModRevision` aggregated 
per-service `schemaRepo` watermarks via `min` — and each `schemaRepo` only 
advances on events for its own catalog (`pkg/schema/init.go:72` filters by 
`g.Catalog`), so the min was perpetually pinned to the  [...]
   - Fix the `notifiedModRevision` watermark advancement in 
`SchemaRegistry.processInitialResourceFromProperty`, `handleWatchEvent` (DELETE 
branch), and `handleDeletion`. Previously `AdvanceNotified` was gated on 
`cache.Update` / `cache.Delete` returning true, but those methods compare 
`latestUpdateAt` (property timestamp) while the watermark tracks `modRevision` 
(etcd revision). When the property timestamp is stale (e.g. a no-op Update that 
doesn't change the measure spec), the cache rej [...]
+  - Fix the `modRevision` contract on no-op Update RPCs 
(`MeasureRegistryService.Update`, etc.). Previously `updateResource` detected 
unchanged content via `CheckerMap` and short-circuited without writing to the 
property store, but the caller had already fabricated `modRevision = 
time.Now().UnixNano()` and returned it. The returned revision never appeared in 
the property watch stream, so `AwaitRevisionApplied(R)` would hang. 
`updateResource` now returns `(int64, error)` — the existing pr [...]
 
 ### Bug Fixes
 
diff --git a/banyand/metadata/schema/property/client.go 
b/banyand/metadata/schema/property/client.go
index dd38c663f..0ffa6fb25 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -666,32 +666,32 @@ func createResource[T proto.Message](ctx context.Context, 
r *SchemaRegistry,
 
 func updateResource[T proto.Message](ctx context.Context, r *SchemaRegistry,
        kind schema.Kind, spec T, validators ...func(prev T) error,
-) error {
+) (int64, error) {
        metadata, metaErr := getMetadataFromSpec(kind, spec)
        if metaErr != nil {
-               return metaErr
+               return 0, metaErr
        }
        if validateErr := r.validateGroup(ctx, kind, metadata.GetGroup()); 
validateErr != nil {
-               return validateErr
+               return 0, validateErr
        }
        originalProp, getErr := r.getSchema(ctx, kind, metadata.GetGroup(), 
metadata.GetName())
        if getErr != nil {
-               return getErr
+               return 0, getErr
        }
        if originalProp == nil {
-               return fmt.Errorf("schema %s/%s not exist", 
metadata.GetGroup(), metadata.GetName())
+               return 0, fmt.Errorf("schema %s/%s not exist", 
metadata.GetGroup(), metadata.GetName())
        }
        prevMd, convErr := ToSchema(kind, originalProp)
        if convErr != nil {
-               return convErr
+               return 0, convErr
        }
        prev, ok := prevMd.Spec.(T)
        if !ok {
-               return fmt.Errorf("unexpected spec type for kind %s", kind)
+               return 0, fmt.Errorf("unexpected spec type for kind %s", kind)
        }
        for _, v := range validators {
                if validateErr := v(prev); validateErr != nil {
-                       return validateErr
+                       return 0, validateErr
                }
        }
        // Preserve created_at from the stored schema; the caller must not 
override it.
@@ -699,16 +699,23 @@ func updateResource[T proto.Message](ctx context.Context, 
r *SchemaRegistry,
                setCreatedAtOnSpec(kind, spec, prevCreatedAt)
        }
        if checker, checkerOk := schema.CheckerMap[kind]; checkerOk && 
checker(prev, spec) {
-               return nil
+               // No-op update: content unchanged. Return the existing 
property's
+               // modRevision so callers observe the revision the barrier will 
see.
+               // The caller's fabricated modRevision was never written to the 
property
+               // store, so returning it would cause AwaitRevisionApplied to 
hang.
+               return originalProp.GetMetadata().GetModRevision(), nil
        }
        prop, propErr := SchemaToProperty(kind, spec)
        if propErr != nil {
-               return propErr
+               return 0, propErr
        }
-       return r.broadcastAll(func(_ string, c *schemaClient) error {
+       if broadcastErr := r.broadcastAll(func(_ string, c *schemaClient) error 
{
                _, rpcErr := c.management.UpdateSchema(ctx, 
&schemav1.UpdateSchemaRequest{Property: prop})
                return rpcErr
-       })
+       }); broadcastErr != nil {
+               return 0, broadcastErr
+       }
+       return metadata.GetModRevision(), nil
 }
 
 // GetStream retrieves a stream schema.
@@ -740,7 +747,7 @@ func (r *SchemaRegistry) UpdateStream(ctx context.Context, 
stream *databasev1.St
        now := time.Now().UnixNano()
        stream.Metadata.ModRevision = now
        stream.UpdatedAt = timestamppb.Now()
-       return now, updateResource(ctx, r, schema.KindStream, stream, func(prev 
*databasev1.Stream) error {
+       return updateResource(ctx, r, schema.KindStream, stream, func(prev 
*databasev1.Stream) error {
                if err := validateStreamUpdate(prev, stream); err != nil {
                        return fmt.Errorf("validation failed: %w", err)
                }
@@ -792,7 +799,7 @@ func (r *SchemaRegistry) UpdateMeasure(ctx context.Context, 
measure *databasev1.
        now := time.Now().UnixNano()
        measure.Metadata.ModRevision = now
        measure.UpdatedAt = timestamppb.Now()
-       return now, updateResource(ctx, r, schema.KindMeasure, measure, 
func(prev *databasev1.Measure) error {
+       return updateResource(ctx, r, schema.KindMeasure, measure, func(prev 
*databasev1.Measure) error {
                if err := validateMeasureUpdate(prev, measure); err != nil {
                        return fmt.Errorf("validation failed: %w", err)
                }
@@ -849,7 +856,7 @@ func (r *SchemaRegistry) UpdateTrace(ctx context.Context, 
trace *databasev1.Trac
        now := time.Now().UnixNano()
        trace.Metadata.ModRevision = now
        trace.UpdatedAt = timestamppb.Now()
-       return now, updateResource(ctx, r, schema.KindTrace, trace, func(prev 
*databasev1.Trace) error {
+       return updateResource(ctx, r, schema.KindTrace, trace, func(prev 
*databasev1.Trace) error {
                if err := validate.TraceUpdate(prev, trace); err != nil {
                        return fmt.Errorf("validation failed: %w", err)
                }
@@ -891,7 +898,7 @@ func (r *SchemaRegistry) UpdateGroup(ctx context.Context, 
group *commonv1.Group)
        now := time.Now().UnixNano()
        group.Metadata.ModRevision = now
        group.UpdatedAt = timestamppb.Now()
-       return now, updateResource(ctx, r, schema.KindGroup, group)
+       return updateResource(ctx, r, schema.KindGroup, group)
 }
 
 // DeleteGroup deletes a group and all its resources.
@@ -969,7 +976,7 @@ func (r *SchemaRegistry) UpdateIndexRule(ctx 
context.Context, indexRule *databas
        now := time.Now().UnixNano()
        indexRule.Metadata.ModRevision = now
        indexRule.UpdatedAt = timestamppb.Now()
-       return now, updateResource(ctx, r, schema.KindIndexRule, indexRule)
+       return updateResource(ctx, r, schema.KindIndexRule, indexRule)
 }
 
 // DeleteIndexRule deletes an index rule schema.
@@ -1006,7 +1013,7 @@ func (r *SchemaRegistry) UpdateIndexRuleBinding(ctx 
context.Context, indexRuleBi
        now := time.Now().UnixNano()
        indexRuleBinding.Metadata.ModRevision = now
        indexRuleBinding.UpdatedAt = timestamppb.Now()
-       return now, updateResource(ctx, r, schema.KindIndexRuleBinding, 
indexRuleBinding)
+       return updateResource(ctx, r, schema.KindIndexRuleBinding, 
indexRuleBinding)
 }
 
 // DeleteIndexRuleBinding deletes an index rule binding schema.
@@ -1043,7 +1050,7 @@ func (r *SchemaRegistry) UpdateTopNAggregation(ctx 
context.Context, topN *databa
        now := time.Now().UnixNano()
        topN.Metadata.ModRevision = now
        topN.UpdatedAt = timestamppb.Now()
-       return now, updateResource(ctx, r, schema.KindTopNAggregation, topN)
+       return updateResource(ctx, r, schema.KindTopNAggregation, topN)
 }
 
 // DeleteTopNAggregation deletes a TopN aggregation schema.
@@ -1078,7 +1085,8 @@ func (r *SchemaRegistry) UpdateProperty(ctx 
context.Context, property *databasev
        }
        now := time.Now().UnixNano()
        property.Metadata.ModRevision = now
-       return updateResource(ctx, r, schema.KindProperty, property)
+       _, updateErr := updateResource(ctx, r, schema.KindProperty, property)
+       return updateErr
 }
 
 // DeleteProperty deletes a property schema.
diff --git a/test/cases/schema/barrier_cluster.go 
b/test/cases/schema/barrier_cluster.go
index a3c2a3ab5..87697cfa5 100644
--- a/test/cases/schema/barrier_cluster.go
+++ b/test/cases/schema/barrier_cluster.go
@@ -130,12 +130,6 @@ var _ = g.Describe("Cluster barrier under partial-cluster 
conditions (§6.12)",
                        Metadata: &commonv1.Metadata{Group: groupName, Name: 
measureName},
                })
                gm.Expect(getErr).ShouldNot(gm.HaveOccurred())
-               // Add a new tag to force a real content change so the property
-               // system creates a new property revision. A no-op Get→Update
-               // bumps the metadata server's etcd revision but the property
-               // store may not create a new revision for unchanged content,
-               // leaving AwaitRevisionApplied waiting for a revision that 
never
-               // arrives in the watch stream.
                measure := getResp.GetMeasure()
                measure.TagFamilies[0].Tags = 
append(measure.TagFamilies[0].Tags,
                        &databasev1.TagSpec{Name: "region", Type: 
databasev1.TagType_TAG_TYPE_STRING})
@@ -144,7 +138,7 @@ var _ = g.Describe("Cluster barrier under partial-cluster 
conditions (§6.12)",
                newRev := updResp.GetModRevision()
                gm.Expect(newRev).Should(gm.BeNumerically(">", baselineRev))
 
-               g.By("Calling AwaitRevisionApplied — paused liaison must 
surface as a laggard")
+               g.By("Calling awaitRevisionApplied — paused liaison must 
surface as a laggard")
                // Brief settle so the bumped revision's watch event has time to
                // reach the paused liaison's SR (which queues it under pause).
                // Without this, the barrier can race the watch broadcast.
@@ -300,15 +294,15 @@ var _ = g.Describe("Cluster barrier under partial-cluster 
conditions (§6.12)",
                        Metadata: &commonv1.Metadata{Group: groupName, Name: 
measureName},
                })
                gm.Expect(getErr).ShouldNot(gm.HaveOccurred())
-               measure := getResp.GetMeasure()
-               measure.TagFamilies[0].Tags = 
append(measure.TagFamilies[0].Tags,
-                       &databasev1.TagSpec{Name: "region", Type: 
databasev1.TagType_TAG_TYPE_STRING})
-               _, firstErr := clients.MeasureRegClient.Update(ctx, 
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
-               gm.Expect(firstErr).ShouldNot(gm.HaveOccurred())
-               measure.TagFamilies[0].Tags = 
append(measure.TagFamilies[0].Tags,
-                       &databasev1.TagSpec{Name: "zone", Type: 
databasev1.TagType_TAG_TYPE_STRING})
-               secondResp, secondErr := clients.MeasureRegClient.Update(ctx, 
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
-               gm.Expect(secondErr).ShouldNot(gm.HaveOccurred())
+                       measure := getResp.GetMeasure()
+                       measure.TagFamilies[0].Tags = 
append(measure.TagFamilies[0].Tags,
+                               &databasev1.TagSpec{Name: "region", Type: 
databasev1.TagType_TAG_TYPE_STRING})
+                       _, firstErr := clients.MeasureRegClient.Update(ctx, 
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
+                       gm.Expect(firstErr).ShouldNot(gm.HaveOccurred())
+                       measure.TagFamilies[0].Tags = 
append(measure.TagFamilies[0].Tags,
+                               &databasev1.TagSpec{Name: "zone", Type: 
databasev1.TagType_TAG_TYPE_STRING})
+                       secondResp, secondErr := 
clients.MeasureRegClient.Update(ctx, 
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
+                       gm.Expect(secondErr).ShouldNot(gm.HaveOccurred())
                finalRev := secondResp.GetModRevision()
 
                g.By("Verifying the barrier reports the paused liaison before 
resume")

Reply via email to