This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new b60e588e1 fix(property): prevent gossip propagation from getting stuck
after replica scale-up (#1127)
b60e588e1 is described below
commit b60e588e12596f7fdc4442353993a0e7f95edb25
Author: mrproliu <[email protected]>
AuthorDate: Wed May 13 20:32:24 2026 +0800
fix(property): prevent gossip propagation from getting stuck after replica
scale-up (#1127)
* fix(property): prevent gossip propagation from getting stuck after
replica scale-up
* fix CI and comments
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
banyand/property/db/repair.go | 19 +-
banyand/property/db/repair_gossip.go | 58 +++-
banyand/property/db/repair_gossip_test.go | 338 +++++++++++++++++++++
banyand/property/db/shard.go | 41 +++
banyand/property/gossip/server.go | 74 +++--
banyand/property/gossip/service_test.go | 344 ++++++++++++++++++++++
test/property_repair/full_data/integrated_test.go | 7 +-
test/property_repair/half_data/integrated_test.go | 7 +-
test/property_repair/same_data/integrated_test.go | 7 +-
test/property_repair/shared_utils.go | 172 ++++++++++-
10 files changed, 1014 insertions(+), 53 deletions(-)
diff --git a/banyand/property/db/repair.go b/banyand/property/db/repair.go
index 2bbe8cfe8..e079c239a 100644
--- a/banyand/property/db/repair.go
+++ b/banyand/property/db/repair.go
@@ -1219,8 +1219,16 @@ type repairSchedulerMetrics struct {
totalRepairBuildTreeLatency meter.Counter
totalRepairBuildTreeConflicts meter.Counter
- totalRepairSuccessCount meter.Counter
- totalRepairFailedCount meter.Counter
+ totalRepairSuccessCount meter.Counter
+ totalRepairFailedCount meter.Counter
+ totalRepairPerPropertyTimeout meter.Counter
+ // repairSuccessLatency samples one full sync-attempt cycle whenever the
+ // per-property repair returned without error (success path AND no-op
+ // path where the remote was already newer). Failures and per-property
+ // timeouts are NOT sampled — operators relying on this metric for
+ // "is the system slow?" must read totalRepairPerPropertyTimeout in
+ // parallel.
+ repairSuccessLatency meter.Histogram
}
func newRepairSchedulerMetrics(omr observability.Factory)
*repairSchedulerMetrics {
@@ -1231,7 +1239,10 @@ func newRepairSchedulerMetrics(omr
observability.Factory) *repairSchedulerMetric
totalRepairBuildTreeLatency:
omr.NewCounter("repair_build_tree_latency"),
totalRepairBuildTreeConflicts:
omr.NewCounter("repair_build_tree_conflicts"),
- totalRepairSuccessCount:
omr.NewCounter("property_repair_success_count", "group", "shard"),
- totalRepairFailedCount:
omr.NewCounter("property_repair_failure_count", "group", "shard"),
+ totalRepairSuccessCount:
omr.NewCounter("property_repair_success_count", "group", "shard"),
+ totalRepairFailedCount:
omr.NewCounter("property_repair_failure_count", "group", "shard"),
+ totalRepairPerPropertyTimeout:
omr.NewCounter("property_repair_per_property_timeout", "group", "shard"),
+ repairSuccessLatency:
omr.NewHistogram("property_repair_success_latency_seconds",
+ meter.Buckets{0.005, 0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10,
30}, "group", "shard"),
}
}
diff --git a/banyand/property/db/repair_gossip.go
b/banyand/property/db/repair_gossip.go
index 914ad08af..c9f258324 100644
--- a/banyand/property/db/repair_gossip.go
+++ b/banyand/property/db/repair_gossip.go
@@ -22,6 +22,7 @@ import (
"fmt"
"io"
"strings"
+ "time"
"github.com/pkg/errors"
grpclib "google.golang.org/grpc"
@@ -36,12 +37,52 @@ import (
var (
gossipMerkleTreeReadPageSize int64 = 10
gossipShardQueryDatabaseSize = 100
+
+ // repairPerPropertyCtxBudget bounds every ctx-aware call inside one
+ // gossip-triggered per-property repair: shard.repair -> s.search,
+ // shard.repair -> buildDeleteFromTimeDocuments -> s.search, and any
+ // nested ctx-aware lookup. updateDocuments takes no ctx and is not
+ // bounded.
+ repairPerPropertyCtxBudget = 10 * time.Second
)
type repairGossipBase struct {
scheduler *repairScheduler
}
+// executeRepairWithBudget runs syncShard.repair under
repairPerPropertyCtxBudget;
+// shared by client and server paths. repairSuccessLatency samples every err
== nil
+// (incl. no-op). totalRepairPerPropertyTimeout fires only when
DeadlineExceeded
+// came from the child budget (parent ctx still active).
+func (b *repairGossipBase) executeRepairWithBudget(
+ ctx context.Context,
+ syncShard *shard,
+ id []byte,
+ property *propertyv1.Property,
+ deleteTime int64,
+ group string,
+) (bool, *queryProperty, error) {
+ repairCtx, repairCancel := context.WithTimeout(ctx,
repairPerPropertyCtxBudget)
+ repairStart := time.Now()
+ updated, newer, err := syncShard.repair(repairCtx, id, property,
deleteTime)
+ repairCancel()
+ shardLabel := fmt.Sprintf("%d", syncShard.id)
+ if err == nil {
+
b.scheduler.metrics.repairSuccessLatency.Observe(time.Since(repairStart).Seconds(),
group, shardLabel)
+ }
+ // Only count per-property timeout when the child budget fired while the
+ // parent ctx was still active. If the parent was already done, the
+ // DeadlineExceeded came from outside this helper and would be a
misleading
+ // signal for "single property too slow".
+ if err != nil && errors.Is(err, context.DeadlineExceeded) && ctx.Err()
== nil {
+ b.scheduler.metrics.totalRepairPerPropertyTimeout.Inc(1, group,
shardLabel)
+ b.scheduler.l.Warn().Str("group", group).Uint32("shardID",
uint32(syncShard.id)).
+ Str("propertyID", string(id)).Dur("budget",
repairPerPropertyCtxBudget).
+ Msg("per-property repair hit timeout — skipping single
property")
+ }
+ return updated, newer, err
+}
+
func (b *repairGossipBase) getTreeReader(ctx context.Context, group string,
shardID uint32) (repairTreeReader, bool, error) {
s, err := b.scheduler.db.loadShard(ctx, group, common.ShardID(shardID))
if err != nil {
@@ -265,6 +306,8 @@ func (r *repairGossipClient) Rev(ctx context.Context,
tracer gossip.Trace, nextN
leafReader := newRepairBufferLeafReader(reader)
var currentComparingClientNode *repairTreeNode
var notProcessingClientNode *repairTreeNode
+ processed := 0
+ loopStart := time.Now()
for {
recvResp, err := stream.Recv()
if err != nil {
@@ -299,6 +342,13 @@ func (r *repairGossipClient) Rev(ctx context.Context,
tracer gossip.Trace, nextN
}
differSpan.End()
case *propertyv1.RepairResponse_PropertySync:
+ processed++
+ if processed%100 == 0 {
+ r.scheduler.l.Info().Int("processed",
processed).
+ Dur("elapsed", time.Since(loopStart)).
+ Str("group",
request.Group).Uint32("shardID", request.ShardId).
+ Msg("repair progress")
+ }
r.scheduler.l.Debug().Msgf("received repair response
from server")
// step 3: keep receiving messages from the server
// if the server still sending different nodes, we
should keep reading them
@@ -307,13 +357,15 @@ func (r *repairGossipClient) Rev(ctx context.Context,
tracer gossip.Trace, nextN
syncSpan := tracer.CreateSpan(startSyncSpan, "repair
property")
syncSpan.Tag(gossip.TraceTagOperateType,
gossip.TraceTagOperateRepairProperty)
syncSpan.Tag(gossip.TraceTagPropertyID,
string(sync.Property.Id))
- updated, newer, err := syncShard.repair(ctx,
sync.Property.Id, sync.Property.Property, sync.Property.DeleteTime)
+ updated, newer, err := r.executeRepairWithBudget(ctx,
syncShard, sync.Property.Id, sync.Property.Property, sync.Property.DeleteTime,
request.Group)
syncSpan.Tag("updated", fmt.Sprintf("%t", updated))
syncSpan.Tag("has_newer", fmt.Sprintf("%t", newer !=
nil))
if err != nil {
syncSpan.Error(err.Error())
r.scheduler.l.Warn().Err(err).Msgf("failed to
repair property %s", sync.Property.Id)
r.scheduler.metrics.totalRepairFailedCount.Inc(1, request.Group,
fmt.Sprintf("%d", request.ShardId))
+ syncSpan.End()
+ continue
}
if updated {
r.scheduler.l.Debug().Msgf("successfully
repaired property %s on client side", sync.Property.Id)
@@ -350,8 +402,8 @@ func (r *repairGossipClient) Rev(ctx context.Context,
tracer gossip.Trace, nextN
syncSpan.Error(err.Error())
r.scheduler.l.Warn().Err(err).Msgf("failed to send newer property sync response
to server, entity: %s", newer.id)
}
- syncSpan.End()
}
+ syncSpan.End()
default:
r.scheduler.l.Warn().Msgf("unexpected response type:
%T, expected DifferTreeSummary or PropertySync", resp)
}
@@ -741,7 +793,7 @@ func (r *repairGossipServer) processPropertySync(
s grpclib.BidiStreamingServer[propertyv1.RepairRequest,
propertyv1.RepairResponse],
group string,
) bool {
- updated, newer, err := syncShard.repair(ctx, sync.Id, sync.Property,
sync.DeleteTime)
+ updated, newer, err := r.executeRepairWithBudget(ctx, syncShard,
sync.Id, sync.Property, sync.DeleteTime, group)
if err != nil {
r.scheduler.l.Warn().Err(err).Msgf("failed to repair property
%s from server side", sync.Id)
r.scheduler.metrics.totalRepairFailedCount.Inc(1, group,
fmt.Sprintf("%d", syncShard.id))
diff --git a/banyand/property/db/repair_gossip_test.go
b/banyand/property/db/repair_gossip_test.go
index 20f428f9f..536f57920 100644
--- a/banyand/property/db/repair_gossip_test.go
+++ b/banyand/property/db/repair_gossip_test.go
@@ -19,6 +19,7 @@ package db
import (
"context"
+ "errors"
"fmt"
"net"
"path"
@@ -580,6 +581,49 @@ type nodeContext struct {
stopMutex sync.RWMutex
}
+// inspectCounter exposes Inc deltas — BypassRegistry would swallow them.
+type inspectCounter struct {
+ totals map[string]float64
+ mu sync.RWMutex
+}
+
+func newInspectCounter() *inspectCounter {
+ return &inspectCounter{totals: make(map[string]float64)}
+}
+
+// Inc satisfies meter.Counter.
+func (c *inspectCounter) Inc(delta float64, labelValues ...string) {
+ key := fmt.Sprintf("%v", labelValues)
+ c.mu.Lock()
+ c.totals[key] += delta
+ c.mu.Unlock()
+}
+
+// Delete satisfies meter.Counter / meter.Histogram (both embed Instrument).
+func (c *inspectCounter) Delete(labelValues ...string) bool {
+ key := fmt.Sprintf("%v", labelValues)
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if _, ok := c.totals[key]; !ok {
+ return false
+ }
+ delete(c.totals, key)
+ return true
+}
+
+// Observe satisfies meter.Histogram. We treat the call count itself as the
+// observable signal — callers assert get(labels) == expected sample count.
+func (c *inspectCounter) Observe(_ float64, labelValues ...string) {
+ c.Inc(1, labelValues...)
+}
+
+func (c *inspectCounter) get(labelValues ...string) float64 {
+ key := fmt.Sprintf("%v", labelValues)
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.totals[key]
+}
+
func (n *nodeContext) appendStop(f func()) {
n.stopMutex.Lock()
defer n.stopMutex.Unlock()
@@ -603,3 +647,297 @@ func nodeContextToParentSlice(ncs []*nodeContext) []node {
}
return nodes
}
+
+// Shrinking the var must force timeout; counter assertions fail if the wrap
is dropped.
+// Only the server-side path (processPropertySync) is exercised here; the
client-side
+// wrap inside processDifferTreeSummary is currently covered by code review
only.
+func TestProcessPropertySyncRespectsPerPropertySearchTimeout(t *testing.T) {
+ prev := repairPerPropertyCtxBudget
+ repairPerPropertyCtxBudget = -time.Hour
+ t.Cleanup(func() { repairPerPropertyCtxBudget = prev })
+
+ var defers []func()
+ defer func() {
+ for _, f := range defers {
+ f()
+ }
+ }()
+
+ dataDir, dataDeferFunc, err := test.NewSpace()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defers = append(defers, dataDeferFunc)
+ snapshotDir, snapshotDeferFunc, err := test.NewSpace()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defers = append(defers, snapshotDeferFunc)
+
+ dbInstance, err := OpenDB(context.Background(), Config{
+ Location: dataDir,
+ MetricsScopeName: "property_test_timeout_wiring",
+ FlushInterval: 3 * time.Second,
+ ExpireToDeleteDuration: 1 * time.Hour,
+ Repair: RepairConfig{
+ Enabled: true,
+ Location: snapshotDir,
+ BuildTreeCron: "@every 10m",
+ QuickBuildTreeTime: time.Second * 10,
+ TreeSlotCount: 32,
+ },
+ }, observability.BypassRegistry, fs.NewLocalFileSystem())
+ if err != nil {
+ t.Fatal(err)
+ }
+ defers = append(defers, func() { _ = dbInstance.Close() })
+
+ db := dbInstance.(*database)
+
+ // Install inspect-able counters so the test can assert the timeout
+ // branch incremented the metric — not merely that "some error"
occurred.
+ timeoutCounter := newInspectCounter()
+ failedCounter := newInspectCounter()
+ successCounter := newInspectCounter()
+ db.repairScheduler.metrics.totalRepairPerPropertyTimeout =
timeoutCounter
+ db.repairScheduler.metrics.totalRepairFailedCount = failedCounter
+ db.repairScheduler.metrics.totalRepairSuccessCount = successCounter
+
+ property := generateProperty("test-id-timeout-wiring",
time.Now().UnixNano(), 0)
+ if updateErr := db.Update(context.Background(), 0,
GetPropertyID(property), property); updateErr != nil {
+ t.Fatal(updateErr)
+ }
+
+ syncShard, loadErr := db.loadShard(context.Background(),
testPropertyGroup, common.ShardID(0))
+ if loadErr != nil {
+ t.Fatal(loadErr)
+ }
+
+ // Sanity probe: a fresh ctx with the pathological timeout must already
be Done.
+ probeCtx, probeCancel := context.WithTimeout(context.Background(),
repairPerPropertyCtxBudget)
+ probeCancel()
+ if !errors.Is(probeCtx.Err(), context.DeadlineExceeded) {
+ t.Fatalf("expected probe ctx to be DeadlineExceeded, got %v",
probeCtx.Err())
+ }
+
+ server := newRepairGossipServer(db.repairScheduler)
+ sync := &propertyv1.PropertySync{
+ Id: GetPropertyID(property),
+ Property: property,
+ DeleteTime: 0,
+ }
+
+ // typed-nil stream is safe: the err path in processPropertySync returns
+ // before touching it. If a future refactor reorders the code so the
+ // stream is used before the err check, this test would panic — that
+ // panic is the desired alarm.
+ var nilStream grpc.BidiStreamingServer[propertyv1.RepairRequest,
propertyv1.RepairResponse]
+ result := server.processPropertySync(context.Background(), syncShard,
sync, nilStream, testPropertyGroup)
+ if result {
+ t.Fatalf("expected processPropertySync to return false when
per-property search timeout fires, got true")
+ }
+
+ // The timeout-specific branch must have incremented the per-property
+ // timeout counter. Without this assertion the test would pass for any
+ // error — including non-timeout regressions like an index corruption —
+ // and the Task 4 wiring would still appear "green".
+ shardLabel := fmt.Sprintf("%d", syncShard.id)
+ if got := timeoutCounter.get(testPropertyGroup, shardLabel); got != 1 {
+ t.Fatalf("expected totalRepairPerPropertyTimeout(%s,%s) == 1,
got %v",
+ testPropertyGroup, shardLabel, got)
+ }
+ if got := failedCounter.get(testPropertyGroup, shardLabel); got != 1 {
+ t.Fatalf("expected totalRepairFailedCount(%s,%s) == 1, got %v",
+ testPropertyGroup, shardLabel, got)
+ }
+ // Protective 0 check: success must not move on a forced-timeout call.
+ // If a future refactor adds a retry-on-timeout path this assertion
needs
+ // to relax, but the timeout/failed counters above must still hold.
+ if got := successCounter.get(testPropertyGroup, shardLabel); got != 0 {
+ t.Fatalf("expected totalRepairSuccessCount(%s,%s) == 0, got %v",
+ testPropertyGroup, shardLabel, got)
+ }
+}
+
+// openTestRepairBase boots an in-process database and returns a fresh
+// repairGossipBase whose metrics are replaced by inspect counters for the
+// caller to assert on. Used by executeRepairWithBudget unit tests so they
+// can exercise client+server-shared timeout/latency semantics in one place.
+func openTestRepairBase(t *testing.T) (
+ base *repairGossipBase,
+ timeoutCounter *inspectCounter,
+ successLatency *inspectCounter,
+ cleanup func(),
+) {
+ t.Helper()
+ var defers []func()
+ cleanup = func() {
+ for _, f := range defers {
+ f()
+ }
+ }
+
+ dataDir, dataDeferFunc, err := test.NewSpace()
+ if err != nil {
+ cleanup()
+ t.Fatal(err)
+ }
+ defers = append(defers, dataDeferFunc)
+ snapshotDir, snapshotDeferFunc, err := test.NewSpace()
+ if err != nil {
+ cleanup()
+ t.Fatal(err)
+ }
+ defers = append(defers, snapshotDeferFunc)
+
+ dbInstance, err := OpenDB(context.Background(), Config{
+ Location: dataDir,
+ MetricsScopeName: "property_test_helper",
+ FlushInterval: 3 * time.Second,
+ ExpireToDeleteDuration: 1 * time.Hour,
+ Repair: RepairConfig{
+ Enabled: true,
+ Location: snapshotDir,
+ BuildTreeCron: "@every 10m",
+ QuickBuildTreeTime: time.Second * 10,
+ TreeSlotCount: 32,
+ },
+ }, observability.BypassRegistry, fs.NewLocalFileSystem())
+ if err != nil {
+ cleanup()
+ t.Fatal(err)
+ }
+ defers = append(defers, func() { _ = dbInstance.Close() })
+
+ db := dbInstance.(*database)
+ timeoutCounter = newInspectCounter()
+ successLatency = newInspectCounter()
+ db.repairScheduler.metrics.totalRepairPerPropertyTimeout =
timeoutCounter
+ db.repairScheduler.metrics.repairSuccessLatency = successLatency
+ base = &repairGossipBase{scheduler: db.repairScheduler}
+ return base, timeoutCounter, successLatency, cleanup
+}
+
+// TestExecuteRepairWithBudget covers the per-property repair helper that both
+// client (processDifferTreeSummary) and server (processPropertySync) call
+// into. Each subtest pins one observability invariant; deleting the matching
+// branch in executeRepairWithBudget makes the corresponding assertion fail.
+func TestExecuteRepairWithBudget(t *testing.T) {
+ shardLabel := "0"
+
+ t.Run("child budget fires under healthy parent ctx → timeout counter
+1", func(t *testing.T) {
+ prev := repairPerPropertyCtxBudget
+ repairPerPropertyCtxBudget = -time.Hour
+ t.Cleanup(func() { repairPerPropertyCtxBudget = prev })
+
+ base, timeoutCounter, successLatency, cleanup :=
openTestRepairBase(t)
+ defer cleanup()
+
+ syncShard, err :=
base.scheduler.db.loadShard(context.Background(), testPropertyGroup,
common.ShardID(0))
+ if err != nil {
+ t.Fatal(err)
+ }
+ property := generateProperty("test-id-budget",
time.Now().UnixNano(), 0)
+
+ _, _, repairErr :=
base.executeRepairWithBudget(context.Background(), syncShard,
+ GetPropertyID(property), property, 0, testPropertyGroup)
+ if repairErr == nil {
+ t.Fatalf("expected err under negative budget, got nil")
+ }
+ if !errors.Is(repairErr, context.DeadlineExceeded) {
+ t.Fatalf("expected wrapped DeadlineExceeded, got %v",
repairErr)
+ }
+ if got := timeoutCounter.get(testPropertyGroup, shardLabel);
got != 1 {
+ t.Fatalf("expected per-property timeout counter == 1,
got %v", got)
+ }
+ if got := successLatency.get(testPropertyGroup, shardLabel);
got != 0 {
+ t.Fatalf("expected success latency not sampled on err,
got %v", got)
+ }
+ })
+
+ t.Run("parent ctx already done → timeout counter not incremented
(parent-not-child)", func(t *testing.T) {
+ base, timeoutCounter, _, cleanup := openTestRepairBase(t)
+ defer cleanup()
+
+ syncShard, err :=
base.scheduler.db.loadShard(context.Background(), testPropertyGroup,
common.ShardID(0))
+ if err != nil {
+ t.Fatal(err)
+ }
+ property := generateProperty("test-id-parent",
time.Now().UnixNano(), 0)
+
+ // Parent ctx already expired: DeadlineExceeded reaches the
helper
+ // but it is NOT from the per-property budget — must not be
counted.
+ parentCtx, parentCancel :=
context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
+ defer parentCancel()
+
+ _, _, repairErr := base.executeRepairWithBudget(parentCtx,
syncShard,
+ GetPropertyID(property), property, 0, testPropertyGroup)
+ if repairErr == nil {
+ t.Fatalf("expected err under already-expired parent
ctx, got nil")
+ }
+ if got := timeoutCounter.get(testPropertyGroup, shardLabel);
got != 0 {
+ t.Fatalf("parent-ctx expiry must not count as
per-property timeout, got %v", got)
+ }
+ })
+
+ t.Run("success path with updated=true → latency sampled once", func(t
*testing.T) {
+ base, timeoutCounter, successLatency, cleanup :=
openTestRepairBase(t)
+ defer cleanup()
+
+ syncShard, err :=
base.scheduler.db.loadShard(context.Background(), testPropertyGroup,
common.ShardID(0))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Empty db → shard.repair takes the "no older properties"
branch,
+ // updateDocuments runs, and returns updated == true.
+ property := generateProperty("test-id-success",
time.Now().UnixNano(), 0)
+
+ updated, _, repairErr :=
base.executeRepairWithBudget(context.Background(), syncShard,
+ GetPropertyID(property), property, 0, testPropertyGroup)
+ if repairErr != nil {
+ t.Fatalf("expected nil err on healthy repair, got %v",
repairErr)
+ }
+ if !updated {
+ t.Fatalf("expected updated == true on empty-db first
repair, got false")
+ }
+ if got := successLatency.get(testPropertyGroup, shardLabel);
got != 1 {
+ t.Fatalf("expected success latency sample count == 1,
got %v", got)
+ }
+ if got := timeoutCounter.get(testPropertyGroup, shardLabel);
got != 0 {
+ t.Fatalf("timeout counter must not move on success, got
%v", got)
+ }
+ })
+
+ t.Run("no-op path with updated=false → latency still sampled (every err
== nil counts)", func(t *testing.T) {
+ base, _, successLatency, cleanup := openTestRepairBase(t)
+ defer cleanup()
+
+ now := time.Now().UnixNano()
+ // Seed a newer revision so the repair finds an older-or-equal
"self"
+ // already present and returns updated == false.
+ existing := generateProperty("test-id-noop", now, 0)
+ if err := base.scheduler.db.Update(context.Background(), 0,
GetPropertyID(existing), existing); err != nil {
+ t.Fatal(err)
+ }
+ syncShard, err :=
base.scheduler.db.loadShard(context.Background(), testPropertyGroup,
common.ShardID(0))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Re-repair with an older revision: helper returns nil err but
+ // updated == false. The success-latency histogram still samples
+ // because a no-op decision is still a successful sync cycle
whose
+ // ctx-aware cost operators care about.
+ olderOrEqual := generateProperty("test-id-noop", now-1, 0)
+ updated, _, repairErr :=
base.executeRepairWithBudget(context.Background(), syncShard,
+ GetPropertyID(olderOrEqual), olderOrEqual, 0,
testPropertyGroup)
+ if repairErr != nil {
+ t.Fatalf("expected nil err on no-op repair, got %v",
repairErr)
+ }
+ if updated {
+ t.Fatalf("expected updated == false on older-or-equal
repair, got true")
+ }
+ if got := successLatency.get(testPropertyGroup, shardLabel);
got != 1 {
+ t.Fatalf("success latency must be sampled once even
when updated == false (err == nil semantics), got %v", got)
+ }
+ })
+}
diff --git a/banyand/property/db/shard.go b/banyand/property/db/shard.go
index a0482c8f7..8962c2b0f 100644
--- a/banyand/property/db/shard.go
+++ b/banyand/property/db/shard.go
@@ -357,6 +357,37 @@ func (s *shard) search(ctx context.Context, q index.Query,
orderBy *propertyv1.Q
}
func (s *shard) repair(ctx context.Context, id []byte, property
*propertyv1.Property, deleteTime int64) (updated bool, selfNewer
*queryProperty, err error) {
+ start := time.Now()
+ var (
+ search1Elapsed time.Duration
+ deletePhaseElapsed time.Duration
+ updateElapsed time.Duration
+ olderCount int
+ deleteCount int
+ )
+ defer func() {
+ elapsed := time.Since(start)
+ switch {
+ case err != nil:
+ s.l.Warn().Int64("elapsed_ms",
elapsed.Milliseconds()).Str("group", property.Metadata.Group).
+ Str("name", property.Metadata.Name).Str("id",
property.Id).Err(err).
+ Int64("search1_ms",
search1Elapsed.Milliseconds()).
+ Int64("delete_phase_ms",
deletePhaseElapsed.Milliseconds()).
+ Int64("update_ms",
updateElapsed.Milliseconds()).
+ Int("older_count", olderCount).
+ Int("delete_count", deleteCount).
+ Msg("property repair failed")
+ case elapsed >= 5*time.Second:
+ s.l.Warn().Int64("elapsed_ms",
elapsed.Milliseconds()).Str("group", property.Metadata.Group).
+ Str("name", property.Metadata.Name).Str("id",
property.Id).
+ Int64("search1_ms",
search1Elapsed.Milliseconds()).
+ Int64("delete_phase_ms",
deletePhaseElapsed.Milliseconds()).
+ Int64("update_ms",
updateElapsed.Milliseconds()).
+ Int("older_count", olderCount).
+ Int("delete_count", deleteCount).
+ Msg("slow property repair")
+ }
+ }()
iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
Groups: []string{property.Metadata.Group},
Name: property.Metadata.Name,
@@ -365,10 +396,13 @@ func (s *shard) repair(ctx context.Context, id []byte,
property *propertyv1.Prop
if err != nil {
return false, nil, fmt.Errorf("build property query failure:
%w", err)
}
+ search1Start := time.Now()
olderProperties, err := s.search(ctx, iq, nil, 100)
+ search1Elapsed = time.Since(search1Start)
if err != nil {
return false, nil, fmt.Errorf("query older properties failed:
%w", err)
}
+ olderCount = len(olderProperties)
sort.Sort(queryPropertySlice(olderProperties))
// if there no older properties, we can insert the latest document.
if len(olderProperties) == 0 {
@@ -377,7 +411,9 @@ func (s *shard) repair(ctx context.Context, id []byte,
property *propertyv1.Prop
if err != nil {
return false, nil, fmt.Errorf("build update document
failed: %w", err)
}
+ updateStart := time.Now()
err = s.updateDocuments(index.Documents{*doc})
+ updateElapsed = time.Since(updateStart)
if err != nil {
return false, nil, fmt.Errorf("update document failed:
%w", err)
}
@@ -393,10 +429,13 @@ func (s *shard) repair(ctx context.Context, id []byte,
property *propertyv1.Prop
}
docIDList := s.buildNotDeletedDocIDList(olderProperties)
+ deletePhaseStart := time.Now()
deletedDocuments, err := s.buildDeleteFromTimeDocuments(ctx, docIDList,
time.Now().UnixNano())
+ deletePhaseElapsed = time.Since(deletePhaseStart)
if err != nil {
return false, nil, fmt.Errorf("build delete older documents
failed: %w", err)
}
+ deleteCount = len(deletedDocuments)
// update the property to mark it as delete
updateDoc, err := s.buildUpdateDocument(id, property, deleteTime)
if err != nil {
@@ -405,7 +444,9 @@ func (s *shard) repair(ctx context.Context, id []byte,
property *propertyv1.Prop
result := make([]index.Document, 0, len(deletedDocuments)+1)
result = append(result, deletedDocuments...)
result = append(result, *updateDoc)
+ updateStart := time.Now()
err = s.updateDocuments(result)
+ updateElapsed = time.Since(updateStart)
if err != nil {
return false, nil, fmt.Errorf("update documents failed: %w",
err)
}
diff --git a/banyand/property/gossip/server.go
b/banyand/property/gossip/server.go
index cff25034f..1a6b8c2a8 100644
--- a/banyand/property/gossip/server.go
+++ b/banyand/property/gossip/server.go
@@ -86,7 +86,7 @@ func (s *service) getServiceRegisters() []func(server
*grpc.Server) {
type groupWithShardPropagation struct {
latestTime time.Time
- channel chan *handlingRequest
+ pending *handlingRequest
originalNodeID string
}
@@ -117,16 +117,24 @@ func (q *protocolHandler) processPropagation() {
for {
select {
case <-q.groupNotify:
- request := q.findUnProcessRequest()
- if request == nil {
- continue
- }
- timeoutCtx, cancelFunc :=
context.WithTimeout(q.s.closer.Ctx(), perNodeSyncTimeout)
- err := q.handle(timeoutCtx, request)
- cancelFunc()
- if err != nil {
- q.s.log.Warn().Err(err).Stringer("request",
request.PropagationRequest).
- Msgf("handle propagation request
failure")
+ // Drain every pending per wake-up; re-check
CloseNotify between handles.
+ for {
+ select {
+ case <-q.s.closer.CloseNotify():
+ return
+ default:
+ }
+ request := q.findUnProcessRequest()
+ if request == nil {
+ break
+ }
+ timeoutCtx, cancelFunc :=
context.WithTimeout(q.s.closer.Ctx(), perNodeSyncTimeout)
+ err := q.handle(timeoutCtx, request)
+ cancelFunc()
+ if err != nil {
+
q.s.log.Warn().Err(err).Stringer("request", request.PropagationRequest).
+ Msgf("handle propagation
request failure")
+ }
}
case <-q.s.closer.CloseNotify():
return
@@ -138,11 +146,10 @@ func (q *protocolHandler) findUnProcessRequest()
*handlingRequest {
q.mu.Lock()
defer q.mu.Unlock()
for _, g := range q.groupWithShards {
- select {
- case d := <-g.channel:
- return d
- default:
- continue
+ if g.pending != nil {
+ req := g.pending
+ g.pending = nil
+ return req
}
}
return nil
@@ -302,6 +309,8 @@ func (q *protocolHandler) contextIsDone(ctx
context.Context) bool {
}
}
+// addToProcess enqueues the request with latest-wins coalesce on pending;
+// returns false when a different originator arrives within TTL.
func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest,
tracer Trace, span Span) bool {
q.mu.Lock()
defer q.mu.Unlock()
@@ -315,11 +324,10 @@ func (q *protocolHandler) addToProcess(request
*propertyv1.PropagationRequest, t
}
if !exist {
groupShard = &groupWithShardPropagation{
- channel: make(chan *handlingRequest, 1),
+ pending: handlingRequestData,
originalNodeID: request.Context.OriginNode,
latestTime: time.Now(),
}
- groupShard.channel <- handlingRequestData
q.groupWithShards[shardKey] = groupShard
q.notifyNewRequest()
return true
@@ -328,23 +336,31 @@ func (q *protocolHandler) addToProcess(request
*propertyv1.PropagationRequest, t
// if the latest round is out of ttl, then needs to change to current
node to executing
if time.Since(groupShard.latestTime) > q.s.scheduleInterval/2 {
groupShard.originalNodeID = request.Context.OriginNode
- select {
- case groupShard.channel <- handlingRequestData:
- q.notifyNewRequest()
- default:
- q.s.log.Error().Msgf("ready to added propagation into
group shard %s(%d) in a new round, but it's full", request.Group,
request.ShardId)
+ if groupShard.pending != nil {
+ q.s.serverMetrics.totalCoalesced.Inc(1, request.Group)
+ q.s.log.Info().
+ Str("group", request.Group).
+ Uint32("shardID", request.ShardId).
+ Str("originNode", request.Context.OriginNode).
+ Msg("propagation request coalesced into pending
(TTL takeover)")
}
+ groupShard.pending = handlingRequestData
+ q.notifyNewRequest()
return true
}
// if the original node ID are a same node, means which from the same
round
if groupShard.originalNodeID == request.Context.OriginNode {
- select {
- case groupShard.channel <- handlingRequestData:
- q.notifyNewRequest()
- default:
- q.s.log.Error().Msgf("ready to added propagation into
group shard %s(%d) in a same round, but it's full", request.Group,
request.ShardId)
+ if groupShard.pending != nil {
+ q.s.serverMetrics.totalCoalesced.Inc(1, request.Group)
+ q.s.log.Info().
+ Str("group", request.Group).
+ Uint32("shardID", request.ShardId).
+ Str("originNode", request.Context.OriginNode).
+ Msg("propagation request coalesced into pending
(same-round latest-wins)")
}
+ groupShard.pending = handlingRequestData
+ q.notifyNewRequest()
return true
}
@@ -419,6 +435,7 @@ type serverMetrics struct {
totalReceived meter.Counter
totalAddProcessed meter.Counter
totalSkipProcess meter.Counter
+ totalCoalesced meter.Counter
totalStarted meter.Counter
totalFinished meter.Counter
@@ -439,6 +456,7 @@ func newServerMetrics(factory observability.Factory)
*serverMetrics {
totalReceived: factory.NewCounter("total_received",
"group"),
totalAddProcessed: factory.NewCounter("total_add_processed",
"group"),
totalSkipProcess: factory.NewCounter("total_skip_process",
"group"),
+ totalCoalesced: factory.NewCounter("total_coalesced",
"group"),
totalStarted: factory.NewCounter("total_started", "group"),
totalFinished: factory.NewCounter("total_finished", "group"),
diff --git a/banyand/property/gossip/service_test.go
b/banyand/property/gossip/service_test.go
index dc747e9a8..cfcacebeb 100644
--- a/banyand/property/gossip/service_test.go
+++ b/banyand/property/gossip/service_test.go
@@ -21,6 +21,7 @@ import (
"context"
"fmt"
"net"
+ "strings"
"sync"
"testing"
"time"
@@ -142,6 +143,268 @@ var _ = ginkgo.Describe("Propagation Messenger", func() {
nodeListenFromVerify(node2, []string{node1.nodeID})
nodeListenFromVerify(node3, []string{node1.nodeID})
})
+
+ // Latest scheduled round must reach Rev when the worker is busy.
+ ginkgo.It("does not silently drop scheduled rounds when handler is
slow", func() {
+ nodes = startNodes(3)
+ svc := nodes[0].messenger.(*service)
+
+ coalesced := newRecordingCounter()
+ svc.serverMetrics.totalCoalesced = coalesced
+
+ blocking := newBlockingListener()
+ svc.listenersLock.Lock()
+ svc.listeners = []MessageListener{blocking}
+ svc.listenersLock.Unlock()
+
+ nodeList := []string{nodes[0].nodeID, nodes[1].nodeID,
nodes[2].nodeID}
+ const totalRounds = 5
+ buildRequest := func(roundIndex int32)
*propertyv1.PropagationRequest {
+ return &propertyv1.PropagationRequest{
+ Context: &propertyv1.PropagationContext{
+ Nodes: nodeList,
+ OriginNode:
nodes[0].nodeID,
+ MaxPropagationCount:
int32(len(nodeList)*2 - 3),
+ CurrentPropagationCount: roundIndex,
+ },
+ Group: mockGroup,
+ ShardId: 0,
+ }
+ }
+
+ _, sendErr :=
svc.protocolHandler.Propagation(context.Background(), buildRequest(0))
+ gomega.Expect(sendErr).NotTo(gomega.HaveOccurred())
+ gomega.Eventually(func() bool {
+ select {
+ case <-blocking.entered:
+ return true
+ default:
+ return false
+ }
+ }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue(),
+ "worker should have drained the first request and
entered listener.Rev")
+
+ _, sendErr =
svc.protocolHandler.Propagation(context.Background(), buildRequest(1))
+ gomega.Expect(sendErr).NotTo(gomega.HaveOccurred())
+ for i := int32(2); i < int32(totalRounds); i++ {
+ _, sendErr =
svc.protocolHandler.Propagation(context.Background(), buildRequest(i))
+ gomega.Expect(sendErr).NotTo(gomega.HaveOccurred())
+ }
+ gomega.Expect(coalesced.get(mockGroup)).To(gomega.Equal(3.0),
+ "3 same-round overwrites of a non-nil pending must each
increment totalCoalesced")
+
+ close(blocking.release)
+ gomega.Eventually(func() int32 {
+ return blocking.maxObservedCount()
+ }, flags.EventuallyTimeout,
"100ms").Should(gomega.Equal(int32(4)),
+ "latest scheduled round must reach Rev under
latest-wins semantics")
+ gomega.Expect(blocking.count()).To(gomega.BeNumerically(">=",
2),
+ "at least the in-flight round and the coalesced latest
round should reach Rev")
+ })
+
+ // TTL takeover wires the new originator into pending when it is empty.
+ ginkgo.It("TTL-expired branch takes over to new originator with empty
pending", func() {
+ nodes = startNodes(2)
+ svc := nodes[0].messenger.(*service)
+ svc.scheduleInterval = 50 * time.Millisecond
+
+ blocking := newBlockingListener()
+ svc.listenersLock.Lock()
+ svc.listeners = []MessageListener{blocking}
+ svc.listenersLock.Unlock()
+
+ nodeList := []string{nodes[0].nodeID, nodes[1].nodeID}
+ buildReq := func(origin string, idx int32)
*propertyv1.PropagationRequest {
+ return &propertyv1.PropagationRequest{
+ Context: &propertyv1.PropagationContext{
+ Nodes: nodeList,
+ OriginNode: origin,
+ MaxPropagationCount: 1,
+ CurrentPropagationCount: idx,
+ },
+ Group: mockGroup,
+ ShardId: 0,
+ }
+ }
+
+ _, err := svc.protocolHandler.Propagation(context.Background(),
buildReq(nodes[0].nodeID, 0))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ gomega.Eventually(func() bool {
+ select {
+ case <-blocking.entered:
+ return true
+ default:
+ return false
+ }
+ }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue())
+
+ time.Sleep(250 * time.Millisecond)
+
+ _, err = svc.protocolHandler.Propagation(context.Background(),
buildReq(nodes[1].nodeID, 99))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ close(blocking.release)
+ gomega.Eventually(func() int32 {
+ return blocking.maxObservedCount()
+ }, flags.EventuallyTimeout,
"100ms").Should(gomega.Equal(int32(99)),
+ "round-B must reach Rev — confirms the TTL-expired
branch ran and wired pending correctly")
+ })
+
+ // TTL takeover overwrites a non-nil pending and bumps totalCoalesced.
+ ginkgo.It("TTL-expired branch coalesces when pending is non-nil",
func() {
+ nodes = startNodes(2)
+ svc := nodes[0].messenger.(*service)
+ svc.scheduleInterval = 50 * time.Millisecond
+ coalesced := newRecordingCounter()
+ svc.serverMetrics.totalCoalesced = coalesced
+
+ blocking := newBlockingListener()
+ svc.listenersLock.Lock()
+ svc.listeners = []MessageListener{blocking}
+ svc.listenersLock.Unlock()
+
+ nodeList := []string{nodes[0].nodeID, nodes[1].nodeID}
+ buildReq := func(origin string, idx int32)
*propertyv1.PropagationRequest {
+ return &propertyv1.PropagationRequest{
+ Context: &propertyv1.PropagationContext{
+ Nodes: nodeList,
+ OriginNode: origin,
+ MaxPropagationCount: 1,
+ CurrentPropagationCount: idx,
+ },
+ Group: mockGroup,
+ ShardId: 0,
+ }
+ }
+
+ _, err := svc.protocolHandler.Propagation(context.Background(),
buildReq(nodes[0].nodeID, 0))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ gomega.Eventually(func() bool {
+ select {
+ case <-blocking.entered:
+ return true
+ default:
+ return false
+ }
+ }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue())
+
+ _, err = svc.protocolHandler.Propagation(context.Background(),
buildReq(nodes[0].nodeID, 1))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ gomega.Expect(coalesced.get(mockGroup)).To(gomega.Equal(0.0),
+ "first same-origin round writes pending without
coalescing")
+
+ time.Sleep(250 * time.Millisecond)
+
+ _, err = svc.protocolHandler.Propagation(context.Background(),
buildReq(nodes[1].nodeID, 99))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ gomega.Expect(coalesced.get(mockGroup)).To(gomega.Equal(1.0),
+ "TTL-expired branch must increment totalCoalesced when
overwriting a non-nil pending")
+
+ close(blocking.release)
+ gomega.Eventually(func() int32 {
+ return blocking.maxObservedCount()
+ }, flags.EventuallyTimeout,
"100ms").Should(gomega.Equal(int32(99)),
+ "round-B (the latest take-over) must reach Rev")
+ })
+
+ // drain-all flushes every queued pending even when groupNotify drops
signals.
+ ginkgo.It("does not strand pending entries when groupNotify is
saturated", func() {
+ nodes = startNodes(2)
+ svc := nodes[0].messenger.(*service)
+
+ blocking := newBlockingListener()
+ svc.listenersLock.Lock()
+ svc.listeners = []MessageListener{blocking}
+ svc.listenersLock.Unlock()
+
+ nodeList := []string{nodes[0].nodeID, nodes[1].nodeID}
+ buildReq := func(shardID uint32, idx int32)
*propertyv1.PropagationRequest {
+ return &propertyv1.PropagationRequest{
+ Context: &propertyv1.PropagationContext{
+ Nodes: nodeList,
+ OriginNode:
nodes[0].nodeID,
+ MaxPropagationCount: 1,
+ CurrentPropagationCount: idx,
+ },
+ Group: mockGroup,
+ ShardId: shardID,
+ }
+ }
+
+ _, err := svc.protocolHandler.Propagation(context.Background(),
buildReq(0, 0))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ gomega.Eventually(func() bool {
+ select {
+ case <-blocking.entered:
+ return true
+ default:
+ return false
+ }
+ }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue(),
+ "worker should drain the initial request and enter
listener.Rev")
+
+ const extraShards uint32 = 12
+ for i := uint32(1); i <= extraShards; i++ {
+ _, err =
svc.protocolHandler.Propagation(context.Background(), buildReq(i, int32(i)))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ }
+
+ close(blocking.release)
+ gomega.Eventually(func() int {
+ return blocking.count()
+ }, flags.EventuallyTimeout,
"100ms").Should(gomega.Equal(int(extraShards)+1),
+ "drain-all on each wake-up must flush every pending
shard even when notifyNewRequest dropped some signals")
+ })
+
+ // CloseNotify must abort the drain loop between handles.
+ ginkgo.It("worker honors CloseNotify in drain loop even with backlog",
func() {
+ nodes = startNodes(2)
+ // Slow listener so each handle takes a measurable amount of
time.
+ nodes[0].listener.delay = 200 * time.Millisecond
+
+ svc := nodes[0].messenger.(*service)
+ listener := nodes[0].listener
+ nodeList := []string{nodes[0].nodeID, nodes[1].nodeID}
+ const backlogShards uint32 = 10
+
+ for i := uint32(0); i < backlogShards; i++ {
+ req := &propertyv1.PropagationRequest{
+ Context: &propertyv1.PropagationContext{
+ Nodes: nodeList,
+ OriginNode: nodes[0].nodeID,
+ MaxPropagationCount: 1,
+ },
+ Group: mockGroup,
+ ShardId: i,
+ }
+ _, err :=
svc.protocolHandler.Propagation(context.Background(), req)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ }
+
+ // Give the worker a moment to start the first handle so the
listener
+ // delay is in flight when we trigger close.
+ time.Sleep(250 * time.Millisecond)
+
+ listener.mu.RLock()
+ countAtClose := len(listener.messages)
+ listener.mu.RUnlock()
+
+ // Trigger close. Without the close check in the drain loop,
the worker
+ // would keep processing every queued shard before exiting.
+ svc.GracefulStop()
+ nodes[0] = nil // suite-level AfterEach must not double-stop
+
+ // Wait long enough that an unfixed worker would have drained
the full
+ // backlog: backlogShards * 200ms = 2s. Use 2.5s safety margin.
+ time.Sleep(2500 * time.Millisecond)
+
+ listener.mu.RLock()
+ extraHandles := len(listener.messages) - countAtClose
+ listener.mu.RUnlock()
+
+ gomega.Expect(extraHandles).To(gomega.BeNumerically("<=", 2),
+ "worker should exit within ~one in-flight handle after
CloseNotify; saw %d extra handles after close (full backlog would be ~%d)",
+ extraHandles, int(backlogShards))
+ })
})
func nodeVerify(n *nodeContext, targets []string, messagesCount int) {
@@ -272,3 +535,84 @@ func (m *mockListener) Rev(_ context.Context, _ Trace,
nextNode *grpc.ClientConn
m.fromNodes = append(m.fromNodes, req.Context.OriginNode)
return nil
}
+
+// blockingListener.Rev blocks on release; propCounts records which rounds
reached Rev.
+type blockingListener struct {
+ release chan struct{}
+ entered chan struct{}
+ propCounts []int32
+ enteredOnce sync.Once
+ mu sync.RWMutex
+}
+
+func newBlockingListener() *blockingListener {
+ return &blockingListener{
+ release: make(chan struct{}),
+ entered: make(chan struct{}),
+ }
+}
+
+func (b *blockingListener) Rev(_ context.Context, _ Trace, _ *grpc.ClientConn,
req *propertyv1.PropagationRequest) error {
+ b.enteredOnce.Do(func() { close(b.entered) })
+ <-b.release
+ b.mu.Lock()
+ b.propCounts = append(b.propCounts, req.Context.CurrentPropagationCount)
+ b.mu.Unlock()
+ return nil
+}
+
+func (b *blockingListener) count() int {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+ return len(b.propCounts)
+}
+
+func (b *blockingListener) maxObservedCount() int32 {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+ if len(b.propCounts) == 0 {
+ return 0
+ }
+ maxCount := b.propCounts[0]
+ for _, c := range b.propCounts[1:] {
+ if c > maxCount {
+ maxCount = c
+ }
+ }
+ return maxCount
+}
+
+// recordingCounter exposes Inc deltas — the bypass registry cannot be
introspected.
+type recordingCounter struct {
+ totals map[string]float64
+ mu sync.RWMutex
+}
+
+func newRecordingCounter() *recordingCounter {
+ return &recordingCounter{totals: make(map[string]float64)}
+}
+
+func (c *recordingCounter) Inc(delta float64, labelValues ...string) {
+ key := strings.Join(labelValues, "\x00")
+ c.mu.Lock()
+ c.totals[key] += delta
+ c.mu.Unlock()
+}
+
+func (c *recordingCounter) Delete(labelValues ...string) bool {
+ key := strings.Join(labelValues, "\x00")
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if _, ok := c.totals[key]; !ok {
+ return false
+ }
+ delete(c.totals, key)
+ return true
+}
+
+func (c *recordingCounter) get(labelValues ...string) float64 {
+ key := strings.Join(labelValues, "\x00")
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.totals[key]
+}
diff --git a/test/property_repair/full_data/integrated_test.go
b/test/property_repair/full_data/integrated_test.go
index 765180c77..f07c6cfa2 100644
--- a/test/property_repair/full_data/integrated_test.go
+++ b/test/property_repair/full_data/integrated_test.go
@@ -168,8 +168,11 @@ var _ = ginkgo.Describe("Property Repair Full Data Test",
ginkgo.Ordered, func()
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
fmt.Sprintf("Node %s should be healthy
before verification: %s",
metrics.NodeName,
metrics.ErrorMessage))
- fmt.Printf("- %s: total_propagation_count=%d,
repair_success_count=%d\n",
- metrics.NodeName,
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+ fmt.Printf("- %s: total_propagation_count=%d,
repair_success_count=%d, repair_failure_count=%d, "+
+ "per_property_timeout=%d,
total_coalesced=%d, success_latency_count=%d, success_latency_sum_sec=%.4f\n",
+ metrics.NodeName,
metrics.TotalPropagationCount, metrics.RepairSuccessCount,
+ metrics.RepairFailureCount,
metrics.RepairPerPropertyTimeoutCount, metrics.TotalCoalesced,
+ metrics.RepairSuccessLatencyCount,
metrics.RepairSuccessLatencySumSeconds)
}
fmt.Println("\n=== Triggering property repair by
waiting for scheduled repair cycle ===")
diff --git a/test/property_repair/half_data/integrated_test.go
b/test/property_repair/half_data/integrated_test.go
index bd40fa682..ff66a1937 100644
--- a/test/property_repair/half_data/integrated_test.go
+++ b/test/property_repair/half_data/integrated_test.go
@@ -195,8 +195,11 @@ var _ = ginkgo.Describe("Property Repair Half Data Test",
ginkgo.Ordered, func()
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
fmt.Sprintf("Node %s should be healthy
before verification: %s",
metrics.NodeName,
metrics.ErrorMessage))
- fmt.Printf("- %s: total_propagation_count=%d,
repair_success_count=%d\n",
- metrics.NodeName,
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+ fmt.Printf("- %s: total_propagation_count=%d,
repair_success_count=%d, repair_failure_count=%d, "+
+ "per_property_timeout=%d,
total_coalesced=%d, success_latency_count=%d, success_latency_sum_sec=%.4f\n",
+ metrics.NodeName,
metrics.TotalPropagationCount, metrics.RepairSuccessCount,
+ metrics.RepairFailureCount,
metrics.RepairPerPropertyTimeoutCount, metrics.TotalCoalesced,
+ metrics.RepairSuccessLatencyCount,
metrics.RepairSuccessLatencySumSeconds)
}
fmt.Println("\n=== Triggering property repair by
waiting for scheduled repair cycle ===")
diff --git a/test/property_repair/same_data/integrated_test.go
b/test/property_repair/same_data/integrated_test.go
index 290aaf310..f7f8b9ed9 100644
--- a/test/property_repair/same_data/integrated_test.go
+++ b/test/property_repair/same_data/integrated_test.go
@@ -153,8 +153,11 @@ var _ = ginkgo.Describe("Property Repair Same Data Test",
ginkgo.Ordered, func()
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
fmt.Sprintf("Node %s should be healthy
before verification: %s",
metrics.NodeName,
metrics.ErrorMessage))
- fmt.Printf("- %s: total_propagation_count=%d,
repair_success_count=%d\n",
- metrics.NodeName,
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+ fmt.Printf("- %s: total_propagation_count=%d,
repair_success_count=%d, repair_failure_count=%d, "+
+ "per_property_timeout=%d,
total_coalesced=%d, success_latency_count=%d, success_latency_sum_sec=%.4f\n",
+ metrics.NodeName,
metrics.TotalPropagationCount, metrics.RepairSuccessCount,
+ metrics.RepairFailureCount,
metrics.RepairPerPropertyTimeoutCount, metrics.TotalCoalesced,
+ metrics.RepairSuccessLatencyCount,
metrics.RepairSuccessLatencySumSeconds)
}
fmt.Println("\n=== Triggering property repair by
waiting for scheduled repair cycle ===")
diff --git a/test/property_repair/shared_utils.go
b/test/property_repair/shared_utils.go
index ee926d305..2d3fb3591 100644
--- a/test/property_repair/shared_utils.go
+++ b/test/property_repair/shared_utils.go
@@ -53,6 +53,11 @@ const (
PropertyName = "perf-test-property"
)
+// promFloatPattern matches a Prometheus exposition value: scientific
+// notation (e.g. 1.23e+06), plain decimal, [+-]Inf or NaN. The previous
+// `\d+(?:\.\d+)?` form silently missed large counters and histogram sums.
+const promFloatPattern = `(-?(?:\d+(?:\.\d+)?(?:[eE][+-]?\d+)?|[+-]?Inf|NaN))`
+
// PrometheusEndpoints defines the prometheus endpoints for data nodes.
var PrometheusEndpoints = []string{
"http://localhost:2122/metrics", // data-node-1
@@ -62,12 +67,22 @@ var PrometheusEndpoints = []string{
// NodeMetrics represents the metrics for a data node.
type NodeMetrics struct {
- LastScrapeTime time.Time
- NodeName string
- ErrorMessage string
- TotalPropagationCount int64
- RepairSuccessCount int64
- IsHealthy bool
+ LastScrapeTime time.Time
+ NodeName string
+ ErrorMessage string
+ TotalPropagationCount int64
+ RepairSuccessCount int64
+ RepairFailureCount int64
+ RepairPerPropertyTimeoutCount int64
+ TotalCoalesced int64
+ // RepairSuccessLatencyCount / SumSeconds come from the success-path
+ // histogram: every err == nil call is sampled (including no-op cases
+ // where the remote was already newer). Failures and per-property
+ // timeouts are NOT sampled, so a round where every repair times out
+ // shows count=0 here.
+ RepairSuccessLatencyCount int64
+ RepairSuccessLatencySumSeconds float64
+ IsHealthy bool
}
// GenerateLargeData creates a string of specified size filled with random
characters.
@@ -287,9 +302,19 @@ func GetNodeMetrics(endpoint string, nodeIndex int)
*NodeMetrics {
content := string(body)
totalPropagationCount := parseTotalPropagationCount(content)
repairSuccessCount := parseRepairSuccessCount(content)
+ repairFailureCount := parseRepairFailureCount(content)
+ repairPerPropertyTimeoutCount :=
parseRepairPerPropertyTimeoutCount(content)
+ totalCoalesced := parseTotalCoalesced(content)
+ repairSuccessLatencyCount := parseRepairSuccessLatencyCount(content)
+ repairSuccessLatencySumSeconds := parseRepairSuccessLatencySum(content)
metrics.TotalPropagationCount = totalPropagationCount
metrics.RepairSuccessCount = repairSuccessCount
+ metrics.RepairFailureCount = repairFailureCount
+ metrics.RepairPerPropertyTimeoutCount = repairPerPropertyTimeoutCount
+ metrics.TotalCoalesced = totalCoalesced
+ metrics.RepairSuccessLatencyCount = repairSuccessLatencyCount
+ metrics.RepairSuccessLatencySumSeconds = repairSuccessLatencySumSeconds
metrics.IsHealthy = true
return metrics
}
@@ -297,7 +322,7 @@ func GetNodeMetrics(endpoint string, nodeIndex int)
*NodeMetrics {
// parseTotalPropagationCount parses the total_propagation_count from
prometheus metrics text.
func parseTotalPropagationCount(content string) int64 {
// Look for metric lines like:
banyandb_property_repair_gossip_server_total_propagation_count{group="perf-test-group",original_node="data-node-1:17912"}
3
- re :=
regexp.MustCompile(`banyandb_property_repair_gossip_server_total_propagation_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`)
+ re :=
regexp.MustCompile(`banyandb_property_repair_gossip_server_total_propagation_count\{[^}]+\}\s+`
+ promFloatPattern)
matches := re.FindAllStringSubmatch(content, -1)
var totalCount int64
@@ -317,7 +342,47 @@ func parseTotalPropagationCount(content string) int64 {
// parseRepairSuccessCount parses the repair_success_count from prometheus
metrics text.
func parseRepairSuccessCount(content string) int64 {
// Look for metric lines like:
banyandb_property_scheduler_property_repair_success_count{group="perf-test-group",shard="0"}
100
- re :=
regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`)
+ re :=
regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_count\{[^}]+\}\s+`
+ promFloatPattern)
+ matches := re.FindAllStringSubmatch(content, -1)
+
+ var totalCount int64
+ for _, match := range matches {
+ if len(match) >= 2 {
+ value, err := strconv.ParseFloat(match[1], 64)
+ if err != nil {
+ continue
+ }
+ totalCount += int64(value)
+ }
+ }
+
+ return totalCount
+}
+
+// parseRepairFailureCount parses the repair_failure_count from prometheus
metrics text.
+func parseRepairFailureCount(content string) int64 {
+ // Look for metric lines like:
banyandb_property_scheduler_property_repair_failure_count{group="perf-test-group",shard="0"}
5
+ re :=
regexp.MustCompile(`banyandb_property_scheduler_property_repair_failure_count\{[^}]+\}\s+`
+ promFloatPattern)
+ matches := re.FindAllStringSubmatch(content, -1)
+
+ var totalCount int64
+ for _, match := range matches {
+ if len(match) >= 2 {
+ value, err := strconv.ParseFloat(match[1], 64)
+ if err != nil {
+ continue
+ }
+ totalCount += int64(value)
+ }
+ }
+
+ return totalCount
+}
+
+// parseRepairPerPropertyTimeoutCount parses the
property_repair_per_property_timeout from prometheus metrics text.
+func parseRepairPerPropertyTimeoutCount(content string) int64 {
+ // Look for metric lines like:
banyandb_property_scheduler_property_repair_per_property_timeout{group="perf-test-group",shard="0"}
3
+ re :=
regexp.MustCompile(`banyandb_property_scheduler_property_repair_per_property_timeout\{[^}]+\}\s+`
+ promFloatPattern)
matches := re.FindAllStringSubmatch(content, -1)
var totalCount int64
@@ -334,6 +399,66 @@ func parseRepairSuccessCount(content string) int64 {
return totalCount
}
+// parseTotalCoalesced parses the gossip total_coalesced from prometheus
metrics text.
+func parseTotalCoalesced(content string) int64 {
+ // Look for metric lines like:
banyandb_property_repair_gossip_server_total_coalesced{group="perf-test-group"}
7
+ re :=
regexp.MustCompile(`banyandb_property_repair_gossip_server_total_coalesced\{[^}]+\}\s+`
+ promFloatPattern)
+ matches := re.FindAllStringSubmatch(content, -1)
+
+ var totalCount int64
+ for _, match := range matches {
+ if len(match) >= 2 {
+ value, err := strconv.ParseFloat(match[1], 64)
+ if err != nil {
+ continue
+ }
+ totalCount += int64(value)
+ }
+ }
+
+ return totalCount
+}
+
+// parseRepairSuccessLatencyCount parses the histogram _count of
property_repair_success_latency_seconds.
+func parseRepairSuccessLatencyCount(content string) int64 {
+ // Look for metric lines like:
banyandb_property_scheduler_property_repair_success_latency_seconds_count{group="perf-test-group",shard="0"}
100
+ re :=
regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_latency_seconds_count\{[^}]+\}\s+`
+ promFloatPattern)
+ matches := re.FindAllStringSubmatch(content, -1)
+
+ var totalCount int64
+ for _, match := range matches {
+ if len(match) >= 2 {
+ value, err := strconv.ParseFloat(match[1], 64)
+ if err != nil {
+ continue
+ }
+ totalCount += int64(value)
+ }
+ }
+
+ return totalCount
+}
+
+// parseRepairSuccessLatencySum parses the histogram _sum (seconds) of
property_repair_success_latency_seconds.
+func parseRepairSuccessLatencySum(content string) float64 {
+ // Look for metric lines like:
banyandb_property_scheduler_property_repair_success_latency_seconds_sum{group="perf-test-group",shard="0"}
12.34
+ re :=
regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_latency_seconds_sum\{[^}]+\}\s+`
+ promFloatPattern)
+ matches := re.FindAllStringSubmatch(content, -1)
+
+ var totalSum float64
+ for _, match := range matches {
+ if len(match) >= 2 {
+ value, err := strconv.ParseFloat(match[1], 64)
+ if err != nil {
+ continue
+ }
+ totalSum += value
+ }
+ }
+
+ return totalSum
+}
+
// GetAllNodeMetrics fetches metrics from all data nodes concurrently.
func GetAllNodeMetrics() []*NodeMetrics {
var wg sync.WaitGroup
@@ -381,24 +506,47 @@ func VerifyPropagationCountIncreased(beforeMetrics,
afterMetrics []*NodeMetrics)
// PrintMetricsComparison prints a comparison of metrics before and after.
func PrintMetricsComparison(beforeMetrics, afterMetrics []*NodeMetrics) {
fmt.Println("=== Prometheus Metrics Comparison ===")
- fmt.Printf("%-12s | %-29s | %-29s | %-7s\n", "Node", "Propagation
Count", "Repair Success Count", "Healthy")
- fmt.Printf("%-12s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-7s\n", "",
"Before", "After", "Delta", "Before", "After", "Delta", "")
- fmt.Println(strings.Repeat("-", 85))
+ fmt.Printf("%-12s | %-29s | %-29s | %-29s | %-29s | %-29s | %-12s |
%-7s\n",
+ "Node", "Propagation Count", "Repair Success Count", "Repair
Failure Count",
+ "Per-Property Timeout", "Coalesced (gossip)", "Succ Mean Lat",
"Healthy")
+ fmt.Printf("%-12s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-9s %-9s %-9s |
%-9s %-9s %-9s | %-9s %-9s %-9s | %-12s | %-7s\n",
+ "",
+ "Before", "After", "Delta",
+ "Before", "After", "Delta",
+ "Before", "After", "Delta",
+ "Before", "After", "Delta",
+ "Before", "After", "Delta",
+ "ThisRound(sec)",
+ "")
+ fmt.Println(strings.Repeat("-", 190))
for i, after := range afterMetrics {
if i < len(beforeMetrics) {
before := beforeMetrics[i]
propagationDelta := after.TotalPropagationCount -
before.TotalPropagationCount
repairDelta := after.RepairSuccessCount -
before.RepairSuccessCount
+ failureDelta := after.RepairFailureCount -
before.RepairFailureCount
+ timeoutDelta := after.RepairPerPropertyTimeoutCount -
before.RepairPerPropertyTimeoutCount
+ coalescedDelta := after.TotalCoalesced -
before.TotalCoalesced
+ meanLatency := 0.0
+ countDelta := after.RepairSuccessLatencyCount -
before.RepairSuccessLatencyCount
+ sumDelta := after.RepairSuccessLatencySumSeconds -
before.RepairSuccessLatencySumSeconds
+ if countDelta > 0 {
+ meanLatency = sumDelta / float64(countDelta)
+ }
healthStatus := "✓"
if !after.IsHealthy {
healthStatus = "✗"
}
- fmt.Printf("%-12s | %-9d %-9d %-9d | %-9d %-9d %-9d |
%-7s\n",
+ fmt.Printf("%-12s | %-9d %-9d %-9d | %-9d %-9d %-9d |
%-9d %-9d %-9d | %-9d %-9d %-9d | %-9d %-9d %-9d | %-12.4f | %-7s\n",
after.NodeName,
before.TotalPropagationCount,
after.TotalPropagationCount, propagationDelta,
before.RepairSuccessCount,
after.RepairSuccessCount, repairDelta,
+ before.RepairFailureCount,
after.RepairFailureCount, failureDelta,
+ before.RepairPerPropertyTimeoutCount,
after.RepairPerPropertyTimeoutCount, timeoutDelta,
+ before.TotalCoalesced, after.TotalCoalesced,
coalescedDelta,
+ meanLatency,
healthStatus)
}
}