This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new b60e588e1 fix(property): prevent gossip propagation from getting stuck 
after replica scale-up (#1127)
b60e588e1 is described below

commit b60e588e12596f7fdc4442353993a0e7f95edb25
Author: mrproliu <[email protected]>
AuthorDate: Wed May 13 20:32:24 2026 +0800

    fix(property): prevent gossip propagation from getting stuck after replica 
scale-up (#1127)
    
    * fix(property): prevent gossip propagation from getting stuck after 
replica scale-up
    
    * fix CI and comments
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 banyand/property/db/repair.go                     |  19 +-
 banyand/property/db/repair_gossip.go              |  58 +++-
 banyand/property/db/repair_gossip_test.go         | 338 +++++++++++++++++++++
 banyand/property/db/shard.go                      |  41 +++
 banyand/property/gossip/server.go                 |  74 +++--
 banyand/property/gossip/service_test.go           | 344 ++++++++++++++++++++++
 test/property_repair/full_data/integrated_test.go |   7 +-
 test/property_repair/half_data/integrated_test.go |   7 +-
 test/property_repair/same_data/integrated_test.go |   7 +-
 test/property_repair/shared_utils.go              | 172 ++++++++++-
 10 files changed, 1014 insertions(+), 53 deletions(-)

diff --git a/banyand/property/db/repair.go b/banyand/property/db/repair.go
index 2bbe8cfe8..e079c239a 100644
--- a/banyand/property/db/repair.go
+++ b/banyand/property/db/repair.go
@@ -1219,8 +1219,16 @@ type repairSchedulerMetrics struct {
        totalRepairBuildTreeLatency   meter.Counter
        totalRepairBuildTreeConflicts meter.Counter
 
-       totalRepairSuccessCount meter.Counter
-       totalRepairFailedCount  meter.Counter
+       totalRepairSuccessCount       meter.Counter
+       totalRepairFailedCount        meter.Counter
+       totalRepairPerPropertyTimeout meter.Counter
+       // repairSuccessLatency samples one full sync-attempt cycle whenever the
+       // per-property repair returned without error (success path AND no-op
+       // path where the remote was already newer). Failures and per-property
+       // timeouts are NOT sampled — operators relying on this metric for
+       // "is the system slow?" must read totalRepairPerPropertyTimeout in
+       // parallel.
+       repairSuccessLatency meter.Histogram
 }
 
 func newRepairSchedulerMetrics(omr observability.Factory) 
*repairSchedulerMetrics {
@@ -1231,7 +1239,10 @@ func newRepairSchedulerMetrics(omr 
observability.Factory) *repairSchedulerMetric
                totalRepairBuildTreeLatency:   
omr.NewCounter("repair_build_tree_latency"),
                totalRepairBuildTreeConflicts: 
omr.NewCounter("repair_build_tree_conflicts"),
 
-               totalRepairSuccessCount: 
omr.NewCounter("property_repair_success_count", "group", "shard"),
-               totalRepairFailedCount:  
omr.NewCounter("property_repair_failure_count", "group", "shard"),
+               totalRepairSuccessCount:       
omr.NewCounter("property_repair_success_count", "group", "shard"),
+               totalRepairFailedCount:        
omr.NewCounter("property_repair_failure_count", "group", "shard"),
+               totalRepairPerPropertyTimeout: 
omr.NewCounter("property_repair_per_property_timeout", "group", "shard"),
+               repairSuccessLatency: 
omr.NewHistogram("property_repair_success_latency_seconds",
+                       meter.Buckets{0.005, 0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10, 
30}, "group", "shard"),
        }
 }
diff --git a/banyand/property/db/repair_gossip.go 
b/banyand/property/db/repair_gossip.go
index 914ad08af..c9f258324 100644
--- a/banyand/property/db/repair_gossip.go
+++ b/banyand/property/db/repair_gossip.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "io"
        "strings"
+       "time"
 
        "github.com/pkg/errors"
        grpclib "google.golang.org/grpc"
@@ -36,12 +37,52 @@ import (
 var (
        gossipMerkleTreeReadPageSize int64 = 10
        gossipShardQueryDatabaseSize       = 100
+
+       // repairPerPropertyCtxBudget bounds every ctx-aware call inside one
+       // gossip-triggered per-property repair: shard.repair -> s.search,
+       // shard.repair -> buildDeleteFromTimeDocuments -> s.search, and any
+       // nested ctx-aware lookup. updateDocuments takes no ctx and is not
+       // bounded.
+       repairPerPropertyCtxBudget = 10 * time.Second
 )
 
 type repairGossipBase struct {
        scheduler *repairScheduler
 }
 
+// executeRepairWithBudget runs syncShard.repair under 
repairPerPropertyCtxBudget;
+// shared by client and server paths. repairSuccessLatency samples every err 
== nil
+// (incl. no-op). totalRepairPerPropertyTimeout fires only when 
DeadlineExceeded
+// came from the child budget (parent ctx still active).
+func (b *repairGossipBase) executeRepairWithBudget(
+       ctx context.Context,
+       syncShard *shard,
+       id []byte,
+       property *propertyv1.Property,
+       deleteTime int64,
+       group string,
+) (bool, *queryProperty, error) {
+       repairCtx, repairCancel := context.WithTimeout(ctx, 
repairPerPropertyCtxBudget)
+       repairStart := time.Now()
+       updated, newer, err := syncShard.repair(repairCtx, id, property, 
deleteTime)
+       repairCancel()
+       shardLabel := fmt.Sprintf("%d", syncShard.id)
+       if err == nil {
+               
b.scheduler.metrics.repairSuccessLatency.Observe(time.Since(repairStart).Seconds(),
 group, shardLabel)
+       }
+       // Only count per-property timeout when the child budget fired while the
+       // parent ctx was still active. If the parent was already done, the
+       // DeadlineExceeded came from outside this helper and would be a 
misleading
+       // signal for "single property too slow".
+       if err != nil && errors.Is(err, context.DeadlineExceeded) && ctx.Err() 
== nil {
+               b.scheduler.metrics.totalRepairPerPropertyTimeout.Inc(1, group, 
shardLabel)
+               b.scheduler.l.Warn().Str("group", group).Uint32("shardID", 
uint32(syncShard.id)).
+                       Str("propertyID", string(id)).Dur("budget", 
repairPerPropertyCtxBudget).
+                       Msg("per-property repair hit timeout — skipping single 
property")
+       }
+       return updated, newer, err
+}
+
 func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, 
shardID uint32) (repairTreeReader, bool, error) {
        s, err := b.scheduler.db.loadShard(ctx, group, common.ShardID(shardID))
        if err != nil {
@@ -265,6 +306,8 @@ func (r *repairGossipClient) Rev(ctx context.Context, 
tracer gossip.Trace, nextN
        leafReader := newRepairBufferLeafReader(reader)
        var currentComparingClientNode *repairTreeNode
        var notProcessingClientNode *repairTreeNode
+       processed := 0
+       loopStart := time.Now()
        for {
                recvResp, err := stream.Recv()
                if err != nil {
@@ -299,6 +342,13 @@ func (r *repairGossipClient) Rev(ctx context.Context, 
tracer gossip.Trace, nextN
                        }
                        differSpan.End()
                case *propertyv1.RepairResponse_PropertySync:
+                       processed++
+                       if processed%100 == 0 {
+                               r.scheduler.l.Info().Int("processed", 
processed).
+                                       Dur("elapsed", time.Since(loopStart)).
+                                       Str("group", 
request.Group).Uint32("shardID", request.ShardId).
+                                       Msg("repair progress")
+                       }
                        r.scheduler.l.Debug().Msgf("received repair response 
from server")
                        // step 3: keep receiving messages from the server
                        // if the server still sending different nodes, we 
should keep reading them
@@ -307,13 +357,15 @@ func (r *repairGossipClient) Rev(ctx context.Context, 
tracer gossip.Trace, nextN
                        syncSpan := tracer.CreateSpan(startSyncSpan, "repair 
property")
                        syncSpan.Tag(gossip.TraceTagOperateType, 
gossip.TraceTagOperateRepairProperty)
                        syncSpan.Tag(gossip.TraceTagPropertyID, 
string(sync.Property.Id))
-                       updated, newer, err := syncShard.repair(ctx, 
sync.Property.Id, sync.Property.Property, sync.Property.DeleteTime)
+                       updated, newer, err := r.executeRepairWithBudget(ctx, 
syncShard, sync.Property.Id, sync.Property.Property, sync.Property.DeleteTime, 
request.Group)
                        syncSpan.Tag("updated", fmt.Sprintf("%t", updated))
                        syncSpan.Tag("has_newer", fmt.Sprintf("%t", newer != 
nil))
                        if err != nil {
                                syncSpan.Error(err.Error())
                                r.scheduler.l.Warn().Err(err).Msgf("failed to 
repair property %s", sync.Property.Id)
                                
r.scheduler.metrics.totalRepairFailedCount.Inc(1, request.Group, 
fmt.Sprintf("%d", request.ShardId))
+                               syncSpan.End()
+                               continue
                        }
                        if updated {
                                r.scheduler.l.Debug().Msgf("successfully 
repaired property %s on client side", sync.Property.Id)
@@ -350,8 +402,8 @@ func (r *repairGossipClient) Rev(ctx context.Context, 
tracer gossip.Trace, nextN
                                        syncSpan.Error(err.Error())
                                        
r.scheduler.l.Warn().Err(err).Msgf("failed to send newer property sync response 
to server, entity: %s", newer.id)
                                }
-                               syncSpan.End()
                        }
+                       syncSpan.End()
                default:
                        r.scheduler.l.Warn().Msgf("unexpected response type: 
%T, expected DifferTreeSummary or PropertySync", resp)
                }
@@ -741,7 +793,7 @@ func (r *repairGossipServer) processPropertySync(
        s grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
        group string,
 ) bool {
-       updated, newer, err := syncShard.repair(ctx, sync.Id, sync.Property, 
sync.DeleteTime)
+       updated, newer, err := r.executeRepairWithBudget(ctx, syncShard, 
sync.Id, sync.Property, sync.DeleteTime, group)
        if err != nil {
                r.scheduler.l.Warn().Err(err).Msgf("failed to repair property 
%s from server side", sync.Id)
                r.scheduler.metrics.totalRepairFailedCount.Inc(1, group, 
fmt.Sprintf("%d", syncShard.id))
diff --git a/banyand/property/db/repair_gossip_test.go 
b/banyand/property/db/repair_gossip_test.go
index 20f428f9f..536f57920 100644
--- a/banyand/property/db/repair_gossip_test.go
+++ b/banyand/property/db/repair_gossip_test.go
@@ -19,6 +19,7 @@ package db
 
 import (
        "context"
+       "errors"
        "fmt"
        "net"
        "path"
@@ -580,6 +581,49 @@ type nodeContext struct {
        stopMutex sync.RWMutex
 }
 
+// inspectCounter exposes Inc deltas — BypassRegistry would swallow them.
+type inspectCounter struct {
+       totals map[string]float64
+       mu     sync.RWMutex
+}
+
+func newInspectCounter() *inspectCounter {
+       return &inspectCounter{totals: make(map[string]float64)}
+}
+
+// Inc satisfies meter.Counter.
+func (c *inspectCounter) Inc(delta float64, labelValues ...string) {
+       key := fmt.Sprintf("%v", labelValues)
+       c.mu.Lock()
+       c.totals[key] += delta
+       c.mu.Unlock()
+}
+
+// Delete satisfies meter.Counter / meter.Histogram (both embed Instrument).
+func (c *inspectCounter) Delete(labelValues ...string) bool {
+       key := fmt.Sprintf("%v", labelValues)
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if _, ok := c.totals[key]; !ok {
+               return false
+       }
+       delete(c.totals, key)
+       return true
+}
+
+// Observe satisfies meter.Histogram. We treat the call count itself as the
+// observable signal — callers assert get(labels) == expected sample count.
+func (c *inspectCounter) Observe(_ float64, labelValues ...string) {
+       c.Inc(1, labelValues...)
+}
+
+func (c *inspectCounter) get(labelValues ...string) float64 {
+       key := fmt.Sprintf("%v", labelValues)
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       return c.totals[key]
+}
+
 func (n *nodeContext) appendStop(f func()) {
        n.stopMutex.Lock()
        defer n.stopMutex.Unlock()
@@ -603,3 +647,297 @@ func nodeContextToParentSlice(ncs []*nodeContext) []node {
        }
        return nodes
 }
+
+// Shrinking the var must force timeout; counter assertions fail if the wrap 
is dropped.
+// Only the server-side path (processPropertySync) is exercised here; the 
client-side
+// wrap inside processDifferTreeSummary is currently covered by code review 
only.
+func TestProcessPropertySyncRespectsPerPropertySearchTimeout(t *testing.T) {
+       prev := repairPerPropertyCtxBudget
+       repairPerPropertyCtxBudget = -time.Hour
+       t.Cleanup(func() { repairPerPropertyCtxBudget = prev })
+
+       var defers []func()
+       defer func() {
+               for _, f := range defers {
+                       f()
+               }
+       }()
+
+       dataDir, dataDeferFunc, err := test.NewSpace()
+       if err != nil {
+               t.Fatal(err)
+       }
+       defers = append(defers, dataDeferFunc)
+       snapshotDir, snapshotDeferFunc, err := test.NewSpace()
+       if err != nil {
+               t.Fatal(err)
+       }
+       defers = append(defers, snapshotDeferFunc)
+
+       dbInstance, err := OpenDB(context.Background(), Config{
+               Location:               dataDir,
+               MetricsScopeName:       "property_test_timeout_wiring",
+               FlushInterval:          3 * time.Second,
+               ExpireToDeleteDuration: 1 * time.Hour,
+               Repair: RepairConfig{
+                       Enabled:            true,
+                       Location:           snapshotDir,
+                       BuildTreeCron:      "@every 10m",
+                       QuickBuildTreeTime: time.Second * 10,
+                       TreeSlotCount:      32,
+               },
+       }, observability.BypassRegistry, fs.NewLocalFileSystem())
+       if err != nil {
+               t.Fatal(err)
+       }
+       defers = append(defers, func() { _ = dbInstance.Close() })
+
+       db := dbInstance.(*database)
+
+       // Install inspect-able counters so the test can assert the timeout
+       // branch incremented the metric — not merely that "some error" 
occurred.
+       timeoutCounter := newInspectCounter()
+       failedCounter := newInspectCounter()
+       successCounter := newInspectCounter()
+       db.repairScheduler.metrics.totalRepairPerPropertyTimeout = 
timeoutCounter
+       db.repairScheduler.metrics.totalRepairFailedCount = failedCounter
+       db.repairScheduler.metrics.totalRepairSuccessCount = successCounter
+
+       property := generateProperty("test-id-timeout-wiring", 
time.Now().UnixNano(), 0)
+       if updateErr := db.Update(context.Background(), 0, 
GetPropertyID(property), property); updateErr != nil {
+               t.Fatal(updateErr)
+       }
+
+       syncShard, loadErr := db.loadShard(context.Background(), 
testPropertyGroup, common.ShardID(0))
+       if loadErr != nil {
+               t.Fatal(loadErr)
+       }
+
+       // Sanity probe: a fresh ctx with the pathological timeout must already 
be Done.
+       probeCtx, probeCancel := context.WithTimeout(context.Background(), 
repairPerPropertyCtxBudget)
+       probeCancel()
+       if !errors.Is(probeCtx.Err(), context.DeadlineExceeded) {
+               t.Fatalf("expected probe ctx to be DeadlineExceeded, got %v", 
probeCtx.Err())
+       }
+
+       server := newRepairGossipServer(db.repairScheduler)
+       sync := &propertyv1.PropertySync{
+               Id:         GetPropertyID(property),
+               Property:   property,
+               DeleteTime: 0,
+       }
+
+       // typed-nil stream is safe: the err path in processPropertySync returns
+       // before touching it. If a future refactor reorders the code so the
+       // stream is used before the err check, this test would panic — that
+       // panic is the desired alarm.
+       var nilStream grpc.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse]
+       result := server.processPropertySync(context.Background(), syncShard, 
sync, nilStream, testPropertyGroup)
+       if result {
+               t.Fatalf("expected processPropertySync to return false when 
per-property search timeout fires, got true")
+       }
+
+       // The timeout-specific branch must have incremented the per-property
+       // timeout counter. Without this assertion the test would pass for any
+       // error — including non-timeout regressions like an index corruption —
+       // and the Task 4 wiring would still appear "green".
+       shardLabel := fmt.Sprintf("%d", syncShard.id)
+       if got := timeoutCounter.get(testPropertyGroup, shardLabel); got != 1 {
+               t.Fatalf("expected totalRepairPerPropertyTimeout(%s,%s) == 1, 
got %v",
+                       testPropertyGroup, shardLabel, got)
+       }
+       if got := failedCounter.get(testPropertyGroup, shardLabel); got != 1 {
+               t.Fatalf("expected totalRepairFailedCount(%s,%s) == 1, got %v",
+                       testPropertyGroup, shardLabel, got)
+       }
+       // Protective 0 check: success must not move on a forced-timeout call.
+       // If a future refactor adds a retry-on-timeout path this assertion 
needs
+       // to relax, but the timeout/failed counters above must still hold.
+       if got := successCounter.get(testPropertyGroup, shardLabel); got != 0 {
+               t.Fatalf("expected totalRepairSuccessCount(%s,%s) == 0, got %v",
+                       testPropertyGroup, shardLabel, got)
+       }
+}
+
+// openTestRepairBase boots an in-process database and returns a fresh
+// repairGossipBase whose metrics are replaced by inspect counters for the
+// caller to assert on. Used by executeRepairWithBudget unit tests so they
+// can exercise client+server-shared timeout/latency semantics in one place.
+func openTestRepairBase(t *testing.T) (
+       base *repairGossipBase,
+       timeoutCounter *inspectCounter,
+       successLatency *inspectCounter,
+       cleanup func(),
+) {
+       t.Helper()
+       var defers []func()
+       cleanup = func() {
+               for _, f := range defers {
+                       f()
+               }
+       }
+
+       dataDir, dataDeferFunc, err := test.NewSpace()
+       if err != nil {
+               cleanup()
+               t.Fatal(err)
+       }
+       defers = append(defers, dataDeferFunc)
+       snapshotDir, snapshotDeferFunc, err := test.NewSpace()
+       if err != nil {
+               cleanup()
+               t.Fatal(err)
+       }
+       defers = append(defers, snapshotDeferFunc)
+
+       dbInstance, err := OpenDB(context.Background(), Config{
+               Location:               dataDir,
+               MetricsScopeName:       "property_test_helper",
+               FlushInterval:          3 * time.Second,
+               ExpireToDeleteDuration: 1 * time.Hour,
+               Repair: RepairConfig{
+                       Enabled:            true,
+                       Location:           snapshotDir,
+                       BuildTreeCron:      "@every 10m",
+                       QuickBuildTreeTime: time.Second * 10,
+                       TreeSlotCount:      32,
+               },
+       }, observability.BypassRegistry, fs.NewLocalFileSystem())
+       if err != nil {
+               cleanup()
+               t.Fatal(err)
+       }
+       defers = append(defers, func() { _ = dbInstance.Close() })
+
+       db := dbInstance.(*database)
+       timeoutCounter = newInspectCounter()
+       successLatency = newInspectCounter()
+       db.repairScheduler.metrics.totalRepairPerPropertyTimeout = 
timeoutCounter
+       db.repairScheduler.metrics.repairSuccessLatency = successLatency
+       base = &repairGossipBase{scheduler: db.repairScheduler}
+       return base, timeoutCounter, successLatency, cleanup
+}
+
+// TestExecuteRepairWithBudget covers the per-property repair helper that both
+// client (processDifferTreeSummary) and server (processPropertySync) call
+// into. Each subtest pins one observability invariant; deleting the matching
+// branch in executeRepairWithBudget makes the corresponding assertion fail.
+func TestExecuteRepairWithBudget(t *testing.T) {
+       shardLabel := "0"
+
+       t.Run("child budget fires under healthy parent ctx → timeout counter 
+1", func(t *testing.T) {
+               prev := repairPerPropertyCtxBudget
+               repairPerPropertyCtxBudget = -time.Hour
+               t.Cleanup(func() { repairPerPropertyCtxBudget = prev })
+
+               base, timeoutCounter, successLatency, cleanup := 
openTestRepairBase(t)
+               defer cleanup()
+
+               syncShard, err := 
base.scheduler.db.loadShard(context.Background(), testPropertyGroup, 
common.ShardID(0))
+               if err != nil {
+                       t.Fatal(err)
+               }
+               property := generateProperty("test-id-budget", 
time.Now().UnixNano(), 0)
+
+               _, _, repairErr := 
base.executeRepairWithBudget(context.Background(), syncShard,
+                       GetPropertyID(property), property, 0, testPropertyGroup)
+               if repairErr == nil {
+                       t.Fatalf("expected err under negative budget, got nil")
+               }
+               if !errors.Is(repairErr, context.DeadlineExceeded) {
+                       t.Fatalf("expected wrapped DeadlineExceeded, got %v", 
repairErr)
+               }
+               if got := timeoutCounter.get(testPropertyGroup, shardLabel); 
got != 1 {
+                       t.Fatalf("expected per-property timeout counter == 1, 
got %v", got)
+               }
+               if got := successLatency.get(testPropertyGroup, shardLabel); 
got != 0 {
+                       t.Fatalf("expected success latency not sampled on err, 
got %v", got)
+               }
+       })
+
+       t.Run("parent ctx already done → timeout counter not incremented 
(parent-not-child)", func(t *testing.T) {
+               base, timeoutCounter, _, cleanup := openTestRepairBase(t)
+               defer cleanup()
+
+               syncShard, err := 
base.scheduler.db.loadShard(context.Background(), testPropertyGroup, 
common.ShardID(0))
+               if err != nil {
+                       t.Fatal(err)
+               }
+               property := generateProperty("test-id-parent", 
time.Now().UnixNano(), 0)
+
+               // Parent ctx already expired: DeadlineExceeded reaches the 
helper
+               // but it is NOT from the per-property budget — must not be 
counted.
+               parentCtx, parentCancel := 
context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
+               defer parentCancel()
+
+               _, _, repairErr := base.executeRepairWithBudget(parentCtx, 
syncShard,
+                       GetPropertyID(property), property, 0, testPropertyGroup)
+               if repairErr == nil {
+                       t.Fatalf("expected err under already-expired parent 
ctx, got nil")
+               }
+               if got := timeoutCounter.get(testPropertyGroup, shardLabel); 
got != 0 {
+                       t.Fatalf("parent-ctx expiry must not count as 
per-property timeout, got %v", got)
+               }
+       })
+
+       t.Run("success path with updated=true → latency sampled once", func(t 
*testing.T) {
+               base, timeoutCounter, successLatency, cleanup := 
openTestRepairBase(t)
+               defer cleanup()
+
+               syncShard, err := 
base.scheduler.db.loadShard(context.Background(), testPropertyGroup, 
common.ShardID(0))
+               if err != nil {
+                       t.Fatal(err)
+               }
+               // Empty db → shard.repair takes the "no older properties" 
branch,
+               // updateDocuments runs, and returns updated == true.
+               property := generateProperty("test-id-success", 
time.Now().UnixNano(), 0)
+
+               updated, _, repairErr := 
base.executeRepairWithBudget(context.Background(), syncShard,
+                       GetPropertyID(property), property, 0, testPropertyGroup)
+               if repairErr != nil {
+                       t.Fatalf("expected nil err on healthy repair, got %v", 
repairErr)
+               }
+               if !updated {
+                       t.Fatalf("expected updated == true on empty-db first 
repair, got false")
+               }
+               if got := successLatency.get(testPropertyGroup, shardLabel); 
got != 1 {
+                       t.Fatalf("expected success latency sample count == 1, 
got %v", got)
+               }
+               if got := timeoutCounter.get(testPropertyGroup, shardLabel); 
got != 0 {
+                       t.Fatalf("timeout counter must not move on success, got 
%v", got)
+               }
+       })
+
+       t.Run("no-op path with updated=false → latency still sampled (every err 
== nil counts)", func(t *testing.T) {
+               base, _, successLatency, cleanup := openTestRepairBase(t)
+               defer cleanup()
+
+               now := time.Now().UnixNano()
+               // Seed a newer revision so the repair finds an older-or-equal 
"self"
+               // already present and returns updated == false.
+               existing := generateProperty("test-id-noop", now, 0)
+               if err := base.scheduler.db.Update(context.Background(), 0, 
GetPropertyID(existing), existing); err != nil {
+                       t.Fatal(err)
+               }
+               syncShard, err := 
base.scheduler.db.loadShard(context.Background(), testPropertyGroup, 
common.ShardID(0))
+               if err != nil {
+                       t.Fatal(err)
+               }
+               // Re-repair with an older revision: helper returns nil err but
+               // updated == false. The success-latency histogram still samples
+               // because a no-op decision is still a successful sync cycle 
whose
+               // ctx-aware cost operators care about.
+               olderOrEqual := generateProperty("test-id-noop", now-1, 0)
+               updated, _, repairErr := 
base.executeRepairWithBudget(context.Background(), syncShard,
+                       GetPropertyID(olderOrEqual), olderOrEqual, 0, 
testPropertyGroup)
+               if repairErr != nil {
+                       t.Fatalf("expected nil err on no-op repair, got %v", 
repairErr)
+               }
+               if updated {
+                       t.Fatalf("expected updated == false on older-or-equal 
repair, got true")
+               }
+               if got := successLatency.get(testPropertyGroup, shardLabel); 
got != 1 {
+                       t.Fatalf("success latency must be sampled once even 
when updated == false (err == nil semantics), got %v", got)
+               }
+       })
+}
diff --git a/banyand/property/db/shard.go b/banyand/property/db/shard.go
index a0482c8f7..8962c2b0f 100644
--- a/banyand/property/db/shard.go
+++ b/banyand/property/db/shard.go
@@ -357,6 +357,37 @@ func (s *shard) search(ctx context.Context, q index.Query, 
orderBy *propertyv1.Q
 }
 
 func (s *shard) repair(ctx context.Context, id []byte, property 
*propertyv1.Property, deleteTime int64) (updated bool, selfNewer 
*queryProperty, err error) {
+       start := time.Now()
+       var (
+               search1Elapsed     time.Duration
+               deletePhaseElapsed time.Duration
+               updateElapsed      time.Duration
+               olderCount         int
+               deleteCount        int
+       )
+       defer func() {
+               elapsed := time.Since(start)
+               switch {
+               case err != nil:
+                       s.l.Warn().Int64("elapsed_ms", 
elapsed.Milliseconds()).Str("group", property.Metadata.Group).
+                               Str("name", property.Metadata.Name).Str("id", 
property.Id).Err(err).
+                               Int64("search1_ms", 
search1Elapsed.Milliseconds()).
+                               Int64("delete_phase_ms", 
deletePhaseElapsed.Milliseconds()).
+                               Int64("update_ms", 
updateElapsed.Milliseconds()).
+                               Int("older_count", olderCount).
+                               Int("delete_count", deleteCount).
+                               Msg("property repair failed")
+               case elapsed >= 5*time.Second:
+                       s.l.Warn().Int64("elapsed_ms", 
elapsed.Milliseconds()).Str("group", property.Metadata.Group).
+                               Str("name", property.Metadata.Name).Str("id", 
property.Id).
+                               Int64("search1_ms", 
search1Elapsed.Milliseconds()).
+                               Int64("delete_phase_ms", 
deletePhaseElapsed.Milliseconds()).
+                               Int64("update_ms", 
updateElapsed.Milliseconds()).
+                               Int("older_count", olderCount).
+                               Int("delete_count", deleteCount).
+                               Msg("slow property repair")
+               }
+       }()
        iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
                Groups: []string{property.Metadata.Group},
                Name:   property.Metadata.Name,
@@ -365,10 +396,13 @@ func (s *shard) repair(ctx context.Context, id []byte, 
property *propertyv1.Prop
        if err != nil {
                return false, nil, fmt.Errorf("build property query failure: 
%w", err)
        }
+       search1Start := time.Now()
        olderProperties, err := s.search(ctx, iq, nil, 100)
+       search1Elapsed = time.Since(search1Start)
        if err != nil {
                return false, nil, fmt.Errorf("query older properties failed: 
%w", err)
        }
+       olderCount = len(olderProperties)
        sort.Sort(queryPropertySlice(olderProperties))
        // if there no older properties, we can insert the latest document.
        if len(olderProperties) == 0 {
@@ -377,7 +411,9 @@ func (s *shard) repair(ctx context.Context, id []byte, 
property *propertyv1.Prop
                if err != nil {
                        return false, nil, fmt.Errorf("build update document 
failed: %w", err)
                }
+               updateStart := time.Now()
                err = s.updateDocuments(index.Documents{*doc})
+               updateElapsed = time.Since(updateStart)
                if err != nil {
                        return false, nil, fmt.Errorf("update document failed: 
%w", err)
                }
@@ -393,10 +429,13 @@ func (s *shard) repair(ctx context.Context, id []byte, 
property *propertyv1.Prop
        }
 
        docIDList := s.buildNotDeletedDocIDList(olderProperties)
+       deletePhaseStart := time.Now()
        deletedDocuments, err := s.buildDeleteFromTimeDocuments(ctx, docIDList, 
time.Now().UnixNano())
+       deletePhaseElapsed = time.Since(deletePhaseStart)
        if err != nil {
                return false, nil, fmt.Errorf("build delete older documents 
failed: %w", err)
        }
+       deleteCount = len(deletedDocuments)
        // update the property to mark it as delete
        updateDoc, err := s.buildUpdateDocument(id, property, deleteTime)
        if err != nil {
@@ -405,7 +444,9 @@ func (s *shard) repair(ctx context.Context, id []byte, 
property *propertyv1.Prop
        result := make([]index.Document, 0, len(deletedDocuments)+1)
        result = append(result, deletedDocuments...)
        result = append(result, *updateDoc)
+       updateStart := time.Now()
        err = s.updateDocuments(result)
+       updateElapsed = time.Since(updateStart)
        if err != nil {
                return false, nil, fmt.Errorf("update documents failed: %w", 
err)
        }
diff --git a/banyand/property/gossip/server.go 
b/banyand/property/gossip/server.go
index cff25034f..1a6b8c2a8 100644
--- a/banyand/property/gossip/server.go
+++ b/banyand/property/gossip/server.go
@@ -86,7 +86,7 @@ func (s *service) getServiceRegisters() []func(server 
*grpc.Server) {
 
 type groupWithShardPropagation struct {
        latestTime     time.Time
-       channel        chan *handlingRequest
+       pending        *handlingRequest
        originalNodeID string
 }
 
@@ -117,16 +117,24 @@ func (q *protocolHandler) processPropagation() {
        for {
                select {
                case <-q.groupNotify:
-                       request := q.findUnProcessRequest()
-                       if request == nil {
-                               continue
-                       }
-                       timeoutCtx, cancelFunc := 
context.WithTimeout(q.s.closer.Ctx(), perNodeSyncTimeout)
-                       err := q.handle(timeoutCtx, request)
-                       cancelFunc()
-                       if err != nil {
-                               q.s.log.Warn().Err(err).Stringer("request", 
request.PropagationRequest).
-                                       Msgf("handle propagation request 
failure")
+                       // Drain every pending per wake-up; re-check 
CloseNotify between handles.
+                       for {
+                               select {
+                               case <-q.s.closer.CloseNotify():
+                                       return
+                               default:
+                               }
+                               request := q.findUnProcessRequest()
+                               if request == nil {
+                                       break
+                               }
+                               timeoutCtx, cancelFunc := 
context.WithTimeout(q.s.closer.Ctx(), perNodeSyncTimeout)
+                               err := q.handle(timeoutCtx, request)
+                               cancelFunc()
+                               if err != nil {
+                                       
q.s.log.Warn().Err(err).Stringer("request", request.PropagationRequest).
+                                               Msgf("handle propagation 
request failure")
+                               }
                        }
                case <-q.s.closer.CloseNotify():
                        return
@@ -138,11 +146,10 @@ func (q *protocolHandler) findUnProcessRequest() 
*handlingRequest {
        q.mu.Lock()
        defer q.mu.Unlock()
        for _, g := range q.groupWithShards {
-               select {
-               case d := <-g.channel:
-                       return d
-               default:
-                       continue
+               if g.pending != nil {
+                       req := g.pending
+                       g.pending = nil
+                       return req
                }
        }
        return nil
@@ -302,6 +309,8 @@ func (q *protocolHandler) contextIsDone(ctx 
context.Context) bool {
        }
 }
 
+// addToProcess enqueues the request with latest-wins coalesce on pending;
+// returns false when a different originator arrives within TTL.
 func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest, 
tracer Trace, span Span) bool {
        q.mu.Lock()
        defer q.mu.Unlock()
@@ -315,11 +324,10 @@ func (q *protocolHandler) addToProcess(request 
*propertyv1.PropagationRequest, t
        }
        if !exist {
                groupShard = &groupWithShardPropagation{
-                       channel:        make(chan *handlingRequest, 1),
+                       pending:        handlingRequestData,
                        originalNodeID: request.Context.OriginNode,
                        latestTime:     time.Now(),
                }
-               groupShard.channel <- handlingRequestData
                q.groupWithShards[shardKey] = groupShard
                q.notifyNewRequest()
                return true
@@ -328,23 +336,31 @@ func (q *protocolHandler) addToProcess(request 
*propertyv1.PropagationRequest, t
        // if the latest round is out of ttl, then needs to change to current 
node to executing
        if time.Since(groupShard.latestTime) > q.s.scheduleInterval/2 {
                groupShard.originalNodeID = request.Context.OriginNode
-               select {
-               case groupShard.channel <- handlingRequestData:
-                       q.notifyNewRequest()
-               default:
-                       q.s.log.Error().Msgf("ready to added propagation into 
group shard %s(%d) in a new round, but it's full", request.Group, 
request.ShardId)
+               if groupShard.pending != nil {
+                       q.s.serverMetrics.totalCoalesced.Inc(1, request.Group)
+                       q.s.log.Info().
+                               Str("group", request.Group).
+                               Uint32("shardID", request.ShardId).
+                               Str("originNode", request.Context.OriginNode).
+                               Msg("propagation request coalesced into pending 
(TTL takeover)")
                }
+               groupShard.pending = handlingRequestData
+               q.notifyNewRequest()
                return true
        }
 
        // if the original node ID are a same node, means which from the same 
round
        if groupShard.originalNodeID == request.Context.OriginNode {
-               select {
-               case groupShard.channel <- handlingRequestData:
-                       q.notifyNewRequest()
-               default:
-                       q.s.log.Error().Msgf("ready to added propagation into 
group shard %s(%d) in a same round, but it's full", request.Group, 
request.ShardId)
+               if groupShard.pending != nil {
+                       q.s.serverMetrics.totalCoalesced.Inc(1, request.Group)
+                       q.s.log.Info().
+                               Str("group", request.Group).
+                               Uint32("shardID", request.ShardId).
+                               Str("originNode", request.Context.OriginNode).
+                               Msg("propagation request coalesced into pending 
(same-round latest-wins)")
                }
+               groupShard.pending = handlingRequestData
+               q.notifyNewRequest()
                return true
        }
 
@@ -419,6 +435,7 @@ type serverMetrics struct {
        totalReceived     meter.Counter
        totalAddProcessed meter.Counter
        totalSkipProcess  meter.Counter
+       totalCoalesced    meter.Counter
 
        totalStarted  meter.Counter
        totalFinished meter.Counter
@@ -439,6 +456,7 @@ func newServerMetrics(factory observability.Factory) 
*serverMetrics {
                totalReceived:     factory.NewCounter("total_received", 
"group"),
                totalAddProcessed: factory.NewCounter("total_add_processed", 
"group"),
                totalSkipProcess:  factory.NewCounter("total_skip_process", 
"group"),
+               totalCoalesced:    factory.NewCounter("total_coalesced", 
"group"),
 
                totalStarted:  factory.NewCounter("total_started", "group"),
                totalFinished: factory.NewCounter("total_finished", "group"),
diff --git a/banyand/property/gossip/service_test.go 
b/banyand/property/gossip/service_test.go
index dc747e9a8..cfcacebeb 100644
--- a/banyand/property/gossip/service_test.go
+++ b/banyand/property/gossip/service_test.go
@@ -21,6 +21,7 @@ import (
        "context"
        "fmt"
        "net"
+       "strings"
        "sync"
        "testing"
        "time"
@@ -142,6 +143,268 @@ var _ = ginkgo.Describe("Propagation Messenger", func() {
                nodeListenFromVerify(node2, []string{node1.nodeID})
                nodeListenFromVerify(node3, []string{node1.nodeID})
        })
+
+       // Latest scheduled round must reach Rev when the worker is busy.
+       ginkgo.It("does not silently drop scheduled rounds when handler is 
slow", func() {
+               nodes = startNodes(3)
+               svc := nodes[0].messenger.(*service)
+
+               coalesced := newRecordingCounter()
+               svc.serverMetrics.totalCoalesced = coalesced
+
+               blocking := newBlockingListener()
+               svc.listenersLock.Lock()
+               svc.listeners = []MessageListener{blocking}
+               svc.listenersLock.Unlock()
+
+               nodeList := []string{nodes[0].nodeID, nodes[1].nodeID, 
nodes[2].nodeID}
+               const totalRounds = 5
+               buildRequest := func(roundIndex int32) 
*propertyv1.PropagationRequest {
+                       return &propertyv1.PropagationRequest{
+                               Context: &propertyv1.PropagationContext{
+                                       Nodes:                   nodeList,
+                                       OriginNode:              
nodes[0].nodeID,
+                                       MaxPropagationCount:     
int32(len(nodeList)*2 - 3),
+                                       CurrentPropagationCount: roundIndex,
+                               },
+                               Group:   mockGroup,
+                               ShardId: 0,
+                       }
+               }
+
+               _, sendErr := 
svc.protocolHandler.Propagation(context.Background(), buildRequest(0))
+               gomega.Expect(sendErr).NotTo(gomega.HaveOccurred())
+               gomega.Eventually(func() bool {
+                       select {
+                       case <-blocking.entered:
+                               return true
+                       default:
+                               return false
+                       }
+               }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue(),
+                       "worker should have drained the first request and 
entered listener.Rev")
+
+               _, sendErr = 
svc.protocolHandler.Propagation(context.Background(), buildRequest(1))
+               gomega.Expect(sendErr).NotTo(gomega.HaveOccurred())
+               for i := int32(2); i < int32(totalRounds); i++ {
+                       _, sendErr = 
svc.protocolHandler.Propagation(context.Background(), buildRequest(i))
+                       gomega.Expect(sendErr).NotTo(gomega.HaveOccurred())
+               }
+               gomega.Expect(coalesced.get(mockGroup)).To(gomega.Equal(3.0),
+                       "3 same-round overwrites of a non-nil pending must each 
increment totalCoalesced")
+
+               close(blocking.release)
+               gomega.Eventually(func() int32 {
+                       return blocking.maxObservedCount()
+               }, flags.EventuallyTimeout, 
"100ms").Should(gomega.Equal(int32(4)),
+                       "latest scheduled round must reach Rev under 
latest-wins semantics")
+               gomega.Expect(blocking.count()).To(gomega.BeNumerically(">=", 
2),
+                       "at least the in-flight round and the coalesced latest 
round should reach Rev")
+       })
+
+       // TTL takeover wires the new originator into pending when it is empty.
+       ginkgo.It("TTL-expired branch takes over to new originator with empty 
pending", func() {
+               nodes = startNodes(2)
+               svc := nodes[0].messenger.(*service)
+               svc.scheduleInterval = 50 * time.Millisecond
+
+               blocking := newBlockingListener()
+               svc.listenersLock.Lock()
+               svc.listeners = []MessageListener{blocking}
+               svc.listenersLock.Unlock()
+
+               nodeList := []string{nodes[0].nodeID, nodes[1].nodeID}
+               buildReq := func(origin string, idx int32) 
*propertyv1.PropagationRequest {
+                       return &propertyv1.PropagationRequest{
+                               Context: &propertyv1.PropagationContext{
+                                       Nodes:                   nodeList,
+                                       OriginNode:              origin,
+                                       MaxPropagationCount:     1,
+                                       CurrentPropagationCount: idx,
+                               },
+                               Group:   mockGroup,
+                               ShardId: 0,
+                       }
+               }
+
+               _, err := svc.protocolHandler.Propagation(context.Background(), 
buildReq(nodes[0].nodeID, 0))
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Eventually(func() bool {
+                       select {
+                       case <-blocking.entered:
+                               return true
+                       default:
+                               return false
+                       }
+               }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue())
+
+               time.Sleep(250 * time.Millisecond)
+
+               _, err = svc.protocolHandler.Propagation(context.Background(), 
buildReq(nodes[1].nodeID, 99))
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               close(blocking.release)
+               gomega.Eventually(func() int32 {
+                       return blocking.maxObservedCount()
+               }, flags.EventuallyTimeout, 
"100ms").Should(gomega.Equal(int32(99)),
+                       "round-B must reach Rev — confirms the TTL-expired 
branch ran and wired pending correctly")
+       })
+
+       // TTL takeover overwrites a non-nil pending and bumps totalCoalesced.
+       ginkgo.It("TTL-expired branch coalesces when pending is non-nil", 
func() {
+               nodes = startNodes(2)
+               svc := nodes[0].messenger.(*service)
+               svc.scheduleInterval = 50 * time.Millisecond
+               coalesced := newRecordingCounter()
+               svc.serverMetrics.totalCoalesced = coalesced
+
+               blocking := newBlockingListener()
+               svc.listenersLock.Lock()
+               svc.listeners = []MessageListener{blocking}
+               svc.listenersLock.Unlock()
+
+               nodeList := []string{nodes[0].nodeID, nodes[1].nodeID}
+               buildReq := func(origin string, idx int32) 
*propertyv1.PropagationRequest {
+                       return &propertyv1.PropagationRequest{
+                               Context: &propertyv1.PropagationContext{
+                                       Nodes:                   nodeList,
+                                       OriginNode:              origin,
+                                       MaxPropagationCount:     1,
+                                       CurrentPropagationCount: idx,
+                               },
+                               Group:   mockGroup,
+                               ShardId: 0,
+                       }
+               }
+
+               _, err := svc.protocolHandler.Propagation(context.Background(), 
buildReq(nodes[0].nodeID, 0))
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Eventually(func() bool {
+                       select {
+                       case <-blocking.entered:
+                               return true
+                       default:
+                               return false
+                       }
+               }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue())
+
+               _, err = svc.protocolHandler.Propagation(context.Background(), 
buildReq(nodes[0].nodeID, 1))
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Expect(coalesced.get(mockGroup)).To(gomega.Equal(0.0),
+                       "first same-origin round writes pending without 
coalescing")
+
+               time.Sleep(250 * time.Millisecond)
+
+               _, err = svc.protocolHandler.Propagation(context.Background(), 
buildReq(nodes[1].nodeID, 99))
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Expect(coalesced.get(mockGroup)).To(gomega.Equal(1.0),
+                       "TTL-expired branch must increment totalCoalesced when 
overwriting a non-nil pending")
+
+               close(blocking.release)
+               gomega.Eventually(func() int32 {
+                       return blocking.maxObservedCount()
+               }, flags.EventuallyTimeout, 
"100ms").Should(gomega.Equal(int32(99)),
+                       "round-B (the latest take-over) must reach Rev")
+       })
+
+       // drain-all flushes every queued pending even when groupNotify drops 
signals.
+       ginkgo.It("does not strand pending entries when groupNotify is 
saturated", func() {
+               nodes = startNodes(2)
+               svc := nodes[0].messenger.(*service)
+
+               blocking := newBlockingListener()
+               svc.listenersLock.Lock()
+               svc.listeners = []MessageListener{blocking}
+               svc.listenersLock.Unlock()
+
+               nodeList := []string{nodes[0].nodeID, nodes[1].nodeID}
+               buildReq := func(shardID uint32, idx int32) 
*propertyv1.PropagationRequest {
+                       return &propertyv1.PropagationRequest{
+                               Context: &propertyv1.PropagationContext{
+                                       Nodes:                   nodeList,
+                                       OriginNode:              
nodes[0].nodeID,
+                                       MaxPropagationCount:     1,
+                                       CurrentPropagationCount: idx,
+                               },
+                               Group:   mockGroup,
+                               ShardId: shardID,
+                       }
+               }
+
+               _, err := svc.protocolHandler.Propagation(context.Background(), 
buildReq(0, 0))
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Eventually(func() bool {
+                       select {
+                       case <-blocking.entered:
+                               return true
+                       default:
+                               return false
+                       }
+               }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue(),
+                       "worker should drain the initial request and enter 
listener.Rev")
+
+               const extraShards uint32 = 12
+               for i := uint32(1); i <= extraShards; i++ {
+                       _, err = 
svc.protocolHandler.Propagation(context.Background(), buildReq(i, int32(i)))
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               }
+
+               close(blocking.release)
+               gomega.Eventually(func() int {
+                       return blocking.count()
+               }, flags.EventuallyTimeout, 
"100ms").Should(gomega.Equal(int(extraShards)+1),
+                       "drain-all on each wake-up must flush every pending 
shard even when notifyNewRequest dropped some signals")
+       })
+
+       // CloseNotify must abort the drain loop between handles.
+       ginkgo.It("worker honors CloseNotify in drain loop even with backlog", 
func() {
+               nodes = startNodes(2)
+               // Slow listener so each handle takes a measurable amount of 
time.
+               nodes[0].listener.delay = 200 * time.Millisecond
+
+               svc := nodes[0].messenger.(*service)
+               listener := nodes[0].listener
+               nodeList := []string{nodes[0].nodeID, nodes[1].nodeID}
+               const backlogShards uint32 = 10
+
+               for i := uint32(0); i < backlogShards; i++ {
+                       req := &propertyv1.PropagationRequest{
+                               Context: &propertyv1.PropagationContext{
+                                       Nodes:               nodeList,
+                                       OriginNode:          nodes[0].nodeID,
+                                       MaxPropagationCount: 1,
+                               },
+                               Group:   mockGroup,
+                               ShardId: i,
+                       }
+                       _, err := 
svc.protocolHandler.Propagation(context.Background(), req)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               }
+
+               // Give the worker a moment to start the first handle so the 
listener
+               // delay is in flight when we trigger close.
+               time.Sleep(250 * time.Millisecond)
+
+               listener.mu.RLock()
+               countAtClose := len(listener.messages)
+               listener.mu.RUnlock()
+
+               // Trigger close. Without the close check in the drain loop, 
the worker
+               // would keep processing every queued shard before exiting.
+               svc.GracefulStop()
+               nodes[0] = nil // suite-level AfterEach must not double-stop
+
+               // Wait long enough that an unfixed worker would have drained 
the full
+               // backlog: backlogShards * 200ms = 2s. Use 2.5s safety margin.
+               time.Sleep(2500 * time.Millisecond)
+
+               listener.mu.RLock()
+               extraHandles := len(listener.messages) - countAtClose
+               listener.mu.RUnlock()
+
+               gomega.Expect(extraHandles).To(gomega.BeNumerically("<=", 2),
+                       "worker should exit within ~one in-flight handle after 
CloseNotify; saw %d extra handles after close (full backlog would be ~%d)",
+                       extraHandles, int(backlogShards))
+       })
 })
 
 func nodeVerify(n *nodeContext, targets []string, messagesCount int) {
@@ -272,3 +535,84 @@ func (m *mockListener) Rev(_ context.Context, _ Trace, 
nextNode *grpc.ClientConn
        m.fromNodes = append(m.fromNodes, req.Context.OriginNode)
        return nil
 }
+
+// blockingListener.Rev blocks on release; propCounts records which rounds 
reached Rev.
+type blockingListener struct {
+       release     chan struct{}
+       entered     chan struct{}
+       propCounts  []int32
+       enteredOnce sync.Once
+       mu          sync.RWMutex
+}
+
+func newBlockingListener() *blockingListener {
+       return &blockingListener{
+               release: make(chan struct{}),
+               entered: make(chan struct{}),
+       }
+}
+
+func (b *blockingListener) Rev(_ context.Context, _ Trace, _ *grpc.ClientConn, 
req *propertyv1.PropagationRequest) error {
+       b.enteredOnce.Do(func() { close(b.entered) })
+       <-b.release
+       b.mu.Lock()
+       b.propCounts = append(b.propCounts, req.Context.CurrentPropagationCount)
+       b.mu.Unlock()
+       return nil
+}
+
+func (b *blockingListener) count() int {
+       b.mu.RLock()
+       defer b.mu.RUnlock()
+       return len(b.propCounts)
+}
+
+func (b *blockingListener) maxObservedCount() int32 {
+       b.mu.RLock()
+       defer b.mu.RUnlock()
+       if len(b.propCounts) == 0 {
+               return 0
+       }
+       maxCount := b.propCounts[0]
+       for _, c := range b.propCounts[1:] {
+               if c > maxCount {
+                       maxCount = c
+               }
+       }
+       return maxCount
+}
+
+// recordingCounter exposes Inc deltas — the bypass registry cannot be 
introspected.
+type recordingCounter struct {
+       totals map[string]float64
+       mu     sync.RWMutex
+}
+
+func newRecordingCounter() *recordingCounter {
+       return &recordingCounter{totals: make(map[string]float64)}
+}
+
+func (c *recordingCounter) Inc(delta float64, labelValues ...string) {
+       key := strings.Join(labelValues, "\x00")
+       c.mu.Lock()
+       c.totals[key] += delta
+       c.mu.Unlock()
+}
+
+func (c *recordingCounter) Delete(labelValues ...string) bool {
+       key := strings.Join(labelValues, "\x00")
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if _, ok := c.totals[key]; !ok {
+               return false
+       }
+       delete(c.totals, key)
+       return true
+}
+
+func (c *recordingCounter) get(labelValues ...string) float64 {
+       key := strings.Join(labelValues, "\x00")
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       return c.totals[key]
+}
diff --git a/test/property_repair/full_data/integrated_test.go 
b/test/property_repair/full_data/integrated_test.go
index 765180c77..f07c6cfa2 100644
--- a/test/property_repair/full_data/integrated_test.go
+++ b/test/property_repair/full_data/integrated_test.go
@@ -168,8 +168,11 @@ var _ = ginkgo.Describe("Property Repair Full Data Test", 
ginkgo.Ordered, func()
                                
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
                                        fmt.Sprintf("Node %s should be healthy 
before verification: %s",
                                                metrics.NodeName, 
metrics.ErrorMessage))
-                               fmt.Printf("- %s: total_propagation_count=%d, 
repair_success_count=%d\n",
-                                       metrics.NodeName, 
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+                               fmt.Printf("- %s: total_propagation_count=%d, 
repair_success_count=%d, repair_failure_count=%d, "+
+                                       "per_property_timeout=%d, 
total_coalesced=%d, success_latency_count=%d, success_latency_sum_sec=%.4f\n",
+                                       metrics.NodeName, 
metrics.TotalPropagationCount, metrics.RepairSuccessCount,
+                                       metrics.RepairFailureCount, 
metrics.RepairPerPropertyTimeoutCount, metrics.TotalCoalesced,
+                                       metrics.RepairSuccessLatencyCount, 
metrics.RepairSuccessLatencySumSeconds)
                        }
 
                        fmt.Println("\n=== Triggering property repair by 
waiting for scheduled repair cycle ===")
diff --git a/test/property_repair/half_data/integrated_test.go 
b/test/property_repair/half_data/integrated_test.go
index bd40fa682..ff66a1937 100644
--- a/test/property_repair/half_data/integrated_test.go
+++ b/test/property_repair/half_data/integrated_test.go
@@ -195,8 +195,11 @@ var _ = ginkgo.Describe("Property Repair Half Data Test", 
ginkgo.Ordered, func()
                                
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
                                        fmt.Sprintf("Node %s should be healthy 
before verification: %s",
                                                metrics.NodeName, 
metrics.ErrorMessage))
-                               fmt.Printf("- %s: total_propagation_count=%d, 
repair_success_count=%d\n",
-                                       metrics.NodeName, 
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+                               fmt.Printf("- %s: total_propagation_count=%d, 
repair_success_count=%d, repair_failure_count=%d, "+
+                                       "per_property_timeout=%d, 
total_coalesced=%d, success_latency_count=%d, success_latency_sum_sec=%.4f\n",
+                                       metrics.NodeName, 
metrics.TotalPropagationCount, metrics.RepairSuccessCount,
+                                       metrics.RepairFailureCount, 
metrics.RepairPerPropertyTimeoutCount, metrics.TotalCoalesced,
+                                       metrics.RepairSuccessLatencyCount, 
metrics.RepairSuccessLatencySumSeconds)
                        }
 
                        fmt.Println("\n=== Triggering property repair by 
waiting for scheduled repair cycle ===")
diff --git a/test/property_repair/same_data/integrated_test.go 
b/test/property_repair/same_data/integrated_test.go
index 290aaf310..f7f8b9ed9 100644
--- a/test/property_repair/same_data/integrated_test.go
+++ b/test/property_repair/same_data/integrated_test.go
@@ -153,8 +153,11 @@ var _ = ginkgo.Describe("Property Repair Same Data Test", 
ginkgo.Ordered, func()
                                
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
                                        fmt.Sprintf("Node %s should be healthy 
before verification: %s",
                                                metrics.NodeName, 
metrics.ErrorMessage))
-                               fmt.Printf("- %s: total_propagation_count=%d, 
repair_success_count=%d\n",
-                                       metrics.NodeName, 
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+                               fmt.Printf("- %s: total_propagation_count=%d, 
repair_success_count=%d, repair_failure_count=%d, "+
+                                       "per_property_timeout=%d, 
total_coalesced=%d, success_latency_count=%d, success_latency_sum_sec=%.4f\n",
+                                       metrics.NodeName, 
metrics.TotalPropagationCount, metrics.RepairSuccessCount,
+                                       metrics.RepairFailureCount, 
metrics.RepairPerPropertyTimeoutCount, metrics.TotalCoalesced,
+                                       metrics.RepairSuccessLatencyCount, 
metrics.RepairSuccessLatencySumSeconds)
                        }
 
                        fmt.Println("\n=== Triggering property repair by 
waiting for scheduled repair cycle ===")
diff --git a/test/property_repair/shared_utils.go 
b/test/property_repair/shared_utils.go
index ee926d305..2d3fb3591 100644
--- a/test/property_repair/shared_utils.go
+++ b/test/property_repair/shared_utils.go
@@ -53,6 +53,11 @@ const (
        PropertyName = "perf-test-property"
 )
 
+// promFloatPattern matches a Prometheus exposition value: scientific
+// notation (e.g. 1.23e+06), plain decimal, [+-]Inf or NaN. The previous
+// `\d+(?:\.\d+)?` form silently missed large counters and histogram sums.
+const promFloatPattern = `(-?(?:\d+(?:\.\d+)?(?:[eE][+-]?\d+)?|[+-]?Inf|NaN))`
+
 // PrometheusEndpoints defines the prometheus endpoints for data nodes.
 var PrometheusEndpoints = []string{
        "http://localhost:2122/metrics";, // data-node-1
@@ -62,12 +67,22 @@ var PrometheusEndpoints = []string{
 
 // NodeMetrics represents the metrics for a data node.
 type NodeMetrics struct {
-       LastScrapeTime        time.Time
-       NodeName              string
-       ErrorMessage          string
-       TotalPropagationCount int64
-       RepairSuccessCount    int64
-       IsHealthy             bool
+       LastScrapeTime                time.Time
+       NodeName                      string
+       ErrorMessage                  string
+       TotalPropagationCount         int64
+       RepairSuccessCount            int64
+       RepairFailureCount            int64
+       RepairPerPropertyTimeoutCount int64
+       TotalCoalesced                int64
+       // RepairSuccessLatencyCount / SumSeconds come from the success-path
+       // histogram: every err == nil call is sampled (including no-op cases
+       // where the remote was already newer). Failures and per-property
+       // timeouts are NOT sampled, so a round where every repair times out
+       // shows count=0 here.
+       RepairSuccessLatencyCount      int64
+       RepairSuccessLatencySumSeconds float64
+       IsHealthy                      bool
 }
 
 // GenerateLargeData creates a string of specified size filled with random 
characters.
@@ -287,9 +302,19 @@ func GetNodeMetrics(endpoint string, nodeIndex int) 
*NodeMetrics {
        content := string(body)
        totalPropagationCount := parseTotalPropagationCount(content)
        repairSuccessCount := parseRepairSuccessCount(content)
+       repairFailureCount := parseRepairFailureCount(content)
+       repairPerPropertyTimeoutCount := 
parseRepairPerPropertyTimeoutCount(content)
+       totalCoalesced := parseTotalCoalesced(content)
+       repairSuccessLatencyCount := parseRepairSuccessLatencyCount(content)
+       repairSuccessLatencySumSeconds := parseRepairSuccessLatencySum(content)
 
        metrics.TotalPropagationCount = totalPropagationCount
        metrics.RepairSuccessCount = repairSuccessCount
+       metrics.RepairFailureCount = repairFailureCount
+       metrics.RepairPerPropertyTimeoutCount = repairPerPropertyTimeoutCount
+       metrics.TotalCoalesced = totalCoalesced
+       metrics.RepairSuccessLatencyCount = repairSuccessLatencyCount
+       metrics.RepairSuccessLatencySumSeconds = repairSuccessLatencySumSeconds
        metrics.IsHealthy = true
        return metrics
 }
@@ -297,7 +322,7 @@ func GetNodeMetrics(endpoint string, nodeIndex int) 
*NodeMetrics {
 // parseTotalPropagationCount parses the total_propagation_count from 
prometheus metrics text.
 func parseTotalPropagationCount(content string) int64 {
        // Look for metric lines like: 
banyandb_property_repair_gossip_server_total_propagation_count{group="perf-test-group",original_node="data-node-1:17912"}
 3
-       re := 
regexp.MustCompile(`banyandb_property_repair_gossip_server_total_propagation_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`)
+       re := 
regexp.MustCompile(`banyandb_property_repair_gossip_server_total_propagation_count\{[^}]+\}\s+`
 + promFloatPattern)
        matches := re.FindAllStringSubmatch(content, -1)
 
        var totalCount int64
@@ -317,7 +342,47 @@ func parseTotalPropagationCount(content string) int64 {
 // parseRepairSuccessCount parses the repair_success_count from prometheus 
metrics text.
 func parseRepairSuccessCount(content string) int64 {
        // Look for metric lines like: 
banyandb_property_scheduler_property_repair_success_count{group="perf-test-group",shard="0"}
 100
-       re := 
regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`)
+       re := 
regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_count\{[^}]+\}\s+`
 + promFloatPattern)
+       matches := re.FindAllStringSubmatch(content, -1)
+
+       var totalCount int64
+       for _, match := range matches {
+               if len(match) >= 2 {
+                       value, err := strconv.ParseFloat(match[1], 64)
+                       if err != nil {
+                               continue
+                       }
+                       totalCount += int64(value)
+               }
+       }
+
+       return totalCount
+}
+
+// parseRepairFailureCount parses the repair_failure_count from prometheus 
metrics text.
+func parseRepairFailureCount(content string) int64 {
+       // Look for metric lines like: 
banyandb_property_scheduler_property_repair_failure_count{group="perf-test-group",shard="0"}
 5
+       re := 
regexp.MustCompile(`banyandb_property_scheduler_property_repair_failure_count\{[^}]+\}\s+`
 + promFloatPattern)
+       matches := re.FindAllStringSubmatch(content, -1)
+
+       var totalCount int64
+       for _, match := range matches {
+               if len(match) >= 2 {
+                       value, err := strconv.ParseFloat(match[1], 64)
+                       if err != nil {
+                               continue
+                       }
+                       totalCount += int64(value)
+               }
+       }
+
+       return totalCount
+}
+
+// parseRepairPerPropertyTimeoutCount parses the 
property_repair_per_property_timeout from prometheus metrics text.
+func parseRepairPerPropertyTimeoutCount(content string) int64 {
+       // Look for metric lines like: 
banyandb_property_scheduler_property_repair_per_property_timeout{group="perf-test-group",shard="0"}
 3
+       re := 
regexp.MustCompile(`banyandb_property_scheduler_property_repair_per_property_timeout\{[^}]+\}\s+`
 + promFloatPattern)
        matches := re.FindAllStringSubmatch(content, -1)
 
        var totalCount int64
@@ -334,6 +399,66 @@ func parseRepairSuccessCount(content string) int64 {
        return totalCount
 }
 
+// parseTotalCoalesced parses the gossip total_coalesced from prometheus 
metrics text.
+func parseTotalCoalesced(content string) int64 {
+       // Look for metric lines like: 
banyandb_property_repair_gossip_server_total_coalesced{group="perf-test-group"} 
7
+       re := 
regexp.MustCompile(`banyandb_property_repair_gossip_server_total_coalesced\{[^}]+\}\s+`
 + promFloatPattern)
+       matches := re.FindAllStringSubmatch(content, -1)
+
+       var totalCount int64
+       for _, match := range matches {
+               if len(match) >= 2 {
+                       value, err := strconv.ParseFloat(match[1], 64)
+                       if err != nil {
+                               continue
+                       }
+                       totalCount += int64(value)
+               }
+       }
+
+       return totalCount
+}
+
+// parseRepairSuccessLatencyCount parses the histogram _count of 
property_repair_success_latency_seconds.
+func parseRepairSuccessLatencyCount(content string) int64 {
+       // Look for metric lines like: 
banyandb_property_scheduler_property_repair_success_latency_seconds_count{group="perf-test-group",shard="0"}
 100
+       re := 
regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_latency_seconds_count\{[^}]+\}\s+`
 + promFloatPattern)
+       matches := re.FindAllStringSubmatch(content, -1)
+
+       var totalCount int64
+       for _, match := range matches {
+               if len(match) >= 2 {
+                       value, err := strconv.ParseFloat(match[1], 64)
+                       if err != nil {
+                               continue
+                       }
+                       totalCount += int64(value)
+               }
+       }
+
+       return totalCount
+}
+
+// parseRepairSuccessLatencySum parses the histogram _sum (seconds) of 
property_repair_success_latency_seconds.
+func parseRepairSuccessLatencySum(content string) float64 {
+       // Look for metric lines like: 
banyandb_property_scheduler_property_repair_success_latency_seconds_sum{group="perf-test-group",shard="0"}
 12.34
+       re := 
regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_latency_seconds_sum\{[^}]+\}\s+`
 + promFloatPattern)
+       matches := re.FindAllStringSubmatch(content, -1)
+
+       var totalSum float64
+       for _, match := range matches {
+               if len(match) >= 2 {
+                       value, err := strconv.ParseFloat(match[1], 64)
+                       if err != nil {
+                               continue
+                       }
+                       totalSum += value
+               }
+       }
+
+       return totalSum
+}
+
 // GetAllNodeMetrics fetches metrics from all data nodes concurrently.
 func GetAllNodeMetrics() []*NodeMetrics {
        var wg sync.WaitGroup
@@ -381,24 +506,47 @@ func VerifyPropagationCountIncreased(beforeMetrics, 
afterMetrics []*NodeMetrics)
 // PrintMetricsComparison prints a comparison of metrics before and after.
 func PrintMetricsComparison(beforeMetrics, afterMetrics []*NodeMetrics) {
        fmt.Println("=== Prometheus Metrics Comparison ===")
-       fmt.Printf("%-12s | %-29s | %-29s | %-7s\n", "Node", "Propagation 
Count", "Repair Success Count", "Healthy")
-       fmt.Printf("%-12s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-7s\n", "", 
"Before", "After", "Delta", "Before", "After", "Delta", "")
-       fmt.Println(strings.Repeat("-", 85))
+       fmt.Printf("%-12s | %-29s | %-29s | %-29s | %-29s | %-29s | %-12s | 
%-7s\n",
+               "Node", "Propagation Count", "Repair Success Count", "Repair 
Failure Count",
+               "Per-Property Timeout", "Coalesced (gossip)", "Succ Mean Lat", 
"Healthy")
+       fmt.Printf("%-12s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-9s %-9s %-9s | 
%-9s %-9s %-9s | %-9s %-9s %-9s | %-12s | %-7s\n",
+               "",
+               "Before", "After", "Delta",
+               "Before", "After", "Delta",
+               "Before", "After", "Delta",
+               "Before", "After", "Delta",
+               "Before", "After", "Delta",
+               "ThisRound(sec)",
+               "")
+       fmt.Println(strings.Repeat("-", 190))
 
        for i, after := range afterMetrics {
                if i < len(beforeMetrics) {
                        before := beforeMetrics[i]
                        propagationDelta := after.TotalPropagationCount - 
before.TotalPropagationCount
                        repairDelta := after.RepairSuccessCount - 
before.RepairSuccessCount
+                       failureDelta := after.RepairFailureCount - 
before.RepairFailureCount
+                       timeoutDelta := after.RepairPerPropertyTimeoutCount - 
before.RepairPerPropertyTimeoutCount
+                       coalescedDelta := after.TotalCoalesced - 
before.TotalCoalesced
+                       meanLatency := 0.0
+                       countDelta := after.RepairSuccessLatencyCount - 
before.RepairSuccessLatencyCount
+                       sumDelta := after.RepairSuccessLatencySumSeconds - 
before.RepairSuccessLatencySumSeconds
+                       if countDelta > 0 {
+                               meanLatency = sumDelta / float64(countDelta)
+                       }
                        healthStatus := "✓"
                        if !after.IsHealthy {
                                healthStatus = "✗"
                        }
 
-                       fmt.Printf("%-12s | %-9d %-9d %-9d | %-9d %-9d %-9d | 
%-7s\n",
+                       fmt.Printf("%-12s | %-9d %-9d %-9d | %-9d %-9d %-9d | 
%-9d %-9d %-9d | %-9d %-9d %-9d | %-9d %-9d %-9d | %-12.4f | %-7s\n",
                                after.NodeName,
                                before.TotalPropagationCount, 
after.TotalPropagationCount, propagationDelta,
                                before.RepairSuccessCount, 
after.RepairSuccessCount, repairDelta,
+                               before.RepairFailureCount, 
after.RepairFailureCount, failureDelta,
+                               before.RepairPerPropertyTimeoutCount, 
after.RepairPerPropertyTimeoutCount, timeoutDelta,
+                               before.TotalCoalesced, after.TotalCoalesced, 
coalescedDelta,
+                               meanLatency,
                                healthStatus)
                }
        }

Reply via email to