This is an automated email from the ASF dual-hosted git repository. ButterBright pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e9282923d77b1000a09bc84cd0a849cacd699451 Author: mrproliu <[email protected]> AuthorDate: Wed May 13 20:32:24 2026 +0800 fix(property): prevent gossip propagation from getting stuck after replica scale-up (#1127) * fix(property): prevent gossip propagation from getting stuck after replica scale-up * fix CI and comments --------- Co-authored-by: Gao Hongtao <[email protected]> --- banyand/property/db/repair.go | 19 +- banyand/property/db/repair_gossip.go | 58 +++- banyand/property/db/repair_gossip_test.go | 338 +++++++++++++++++++++ banyand/property/db/shard.go | 41 +++ banyand/property/gossip/server.go | 74 +++-- banyand/property/gossip/service_test.go | 344 ++++++++++++++++++++++ test/property_repair/full_data/integrated_test.go | 7 +- test/property_repair/half_data/integrated_test.go | 7 +- test/property_repair/same_data/integrated_test.go | 7 +- test/property_repair/shared_utils.go | 172 ++++++++++- 10 files changed, 1014 insertions(+), 53 deletions(-) diff --git a/banyand/property/db/repair.go b/banyand/property/db/repair.go index a5b967908..0829421f4 100644 --- a/banyand/property/db/repair.go +++ b/banyand/property/db/repair.go @@ -1219,8 +1219,16 @@ type repairSchedulerMetrics struct { totalRepairBuildTreeLatency meter.Counter totalRepairBuildTreeConflicts meter.Counter - totalRepairSuccessCount meter.Counter - totalRepairFailedCount meter.Counter + totalRepairSuccessCount meter.Counter + totalRepairFailedCount meter.Counter + totalRepairPerPropertyTimeout meter.Counter + // repairSuccessLatency samples one full sync-attempt cycle whenever the + // per-property repair returned without error (success path AND no-op + // path where the remote was already newer). Failures and per-property + // timeouts are NOT sampled — operators relying on this metric for + // "is the system slow?" must read totalRepairPerPropertyTimeout in + // parallel. + repairSuccessLatency meter.Histogram } func newRepairSchedulerMetrics(omr observability.Factory) *repairSchedulerMetrics { @@ -1231,7 +1239,10 @@ func newRepairSchedulerMetrics(omr observability.Factory) *repairSchedulerMetric totalRepairBuildTreeLatency: omr.NewCounter("repair_build_tree_latency"), totalRepairBuildTreeConflicts: omr.NewCounter("repair_build_tree_conflicts"), - totalRepairSuccessCount: omr.NewCounter("property_repair_success_count", "group", "shard"), - totalRepairFailedCount: omr.NewCounter("property_repair_failure_count", "group", "shard"), + totalRepairSuccessCount: omr.NewCounter("property_repair_success_count", "group", "shard"), + totalRepairFailedCount: omr.NewCounter("property_repair_failure_count", "group", "shard"), + totalRepairPerPropertyTimeout: omr.NewCounter("property_repair_per_property_timeout", "group", "shard"), + repairSuccessLatency: omr.NewHistogram("property_repair_success_latency_seconds", + meter.Buckets{0.005, 0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10, 30}, "group", "shard"), } } diff --git a/banyand/property/db/repair_gossip.go b/banyand/property/db/repair_gossip.go index 914ad08af..c9f258324 100644 --- a/banyand/property/db/repair_gossip.go +++ b/banyand/property/db/repair_gossip.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "strings" + "time" "github.com/pkg/errors" grpclib "google.golang.org/grpc" @@ -36,12 +37,52 @@ import ( var ( gossipMerkleTreeReadPageSize int64 = 10 gossipShardQueryDatabaseSize = 100 + + // repairPerPropertyCtxBudget bounds every ctx-aware call inside one + // gossip-triggered per-property repair: shard.repair -> s.search, + // shard.repair -> buildDeleteFromTimeDocuments -> s.search, and any + // nested ctx-aware lookup. updateDocuments takes no ctx and is not + // bounded. + repairPerPropertyCtxBudget = 10 * time.Second ) type repairGossipBase struct { scheduler *repairScheduler } +// executeRepairWithBudget runs syncShard.repair under repairPerPropertyCtxBudget; +// shared by client and server paths. repairSuccessLatency samples every err == nil +// (incl. no-op). totalRepairPerPropertyTimeout fires only when DeadlineExceeded +// came from the child budget (parent ctx still active). +func (b *repairGossipBase) executeRepairWithBudget( + ctx context.Context, + syncShard *shard, + id []byte, + property *propertyv1.Property, + deleteTime int64, + group string, +) (bool, *queryProperty, error) { + repairCtx, repairCancel := context.WithTimeout(ctx, repairPerPropertyCtxBudget) + repairStart := time.Now() + updated, newer, err := syncShard.repair(repairCtx, id, property, deleteTime) + repairCancel() + shardLabel := fmt.Sprintf("%d", syncShard.id) + if err == nil { + b.scheduler.metrics.repairSuccessLatency.Observe(time.Since(repairStart).Seconds(), group, shardLabel) + } + // Only count per-property timeout when the child budget fired while the + // parent ctx was still active. If the parent was already done, the + // DeadlineExceeded came from outside this helper and would be a misleading + // signal for "single property too slow". + if err != nil && errors.Is(err, context.DeadlineExceeded) && ctx.Err() == nil { + b.scheduler.metrics.totalRepairPerPropertyTimeout.Inc(1, group, shardLabel) + b.scheduler.l.Warn().Str("group", group).Uint32("shardID", uint32(syncShard.id)). + Str("propertyID", string(id)).Dur("budget", repairPerPropertyCtxBudget). + Msg("per-property repair hit timeout — skipping single property") + } + return updated, newer, err +} + func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, shardID uint32) (repairTreeReader, bool, error) { s, err := b.scheduler.db.loadShard(ctx, group, common.ShardID(shardID)) if err != nil { @@ -265,6 +306,8 @@ func (r *repairGossipClient) Rev(ctx context.Context, tracer gossip.Trace, nextN leafReader := newRepairBufferLeafReader(reader) var currentComparingClientNode *repairTreeNode var notProcessingClientNode *repairTreeNode + processed := 0 + loopStart := time.Now() for { recvResp, err := stream.Recv() if err != nil { @@ -299,6 +342,13 @@ func (r *repairGossipClient) Rev(ctx context.Context, tracer gossip.Trace, nextN } differSpan.End() case *propertyv1.RepairResponse_PropertySync: + processed++ + if processed%100 == 0 { + r.scheduler.l.Info().Int("processed", processed). + Dur("elapsed", time.Since(loopStart)). + Str("group", request.Group).Uint32("shardID", request.ShardId). + Msg("repair progress") + } r.scheduler.l.Debug().Msgf("received repair response from server") // step 3: keep receiving messages from the server // if the server still sending different nodes, we should keep reading them @@ -307,13 +357,15 @@ func (r *repairGossipClient) Rev(ctx context.Context, tracer gossip.Trace, nextN syncSpan := tracer.CreateSpan(startSyncSpan, "repair property") syncSpan.Tag(gossip.TraceTagOperateType, gossip.TraceTagOperateRepairProperty) syncSpan.Tag(gossip.TraceTagPropertyID, string(sync.Property.Id)) - updated, newer, err := syncShard.repair(ctx, sync.Property.Id, sync.Property.Property, sync.Property.DeleteTime) + updated, newer, err := r.executeRepairWithBudget(ctx, syncShard, sync.Property.Id, sync.Property.Property, sync.Property.DeleteTime, request.Group) syncSpan.Tag("updated", fmt.Sprintf("%t", updated)) syncSpan.Tag("has_newer", fmt.Sprintf("%t", newer != nil)) if err != nil { syncSpan.Error(err.Error()) r.scheduler.l.Warn().Err(err).Msgf("failed to repair property %s", sync.Property.Id) r.scheduler.metrics.totalRepairFailedCount.Inc(1, request.Group, fmt.Sprintf("%d", request.ShardId)) + syncSpan.End() + continue } if updated { r.scheduler.l.Debug().Msgf("successfully repaired property %s on client side", sync.Property.Id) @@ -350,8 +402,8 @@ func (r *repairGossipClient) Rev(ctx context.Context, tracer gossip.Trace, nextN syncSpan.Error(err.Error()) r.scheduler.l.Warn().Err(err).Msgf("failed to send newer property sync response to server, entity: %s", newer.id) } - syncSpan.End() } + syncSpan.End() default: r.scheduler.l.Warn().Msgf("unexpected response type: %T, expected DifferTreeSummary or PropertySync", resp) } @@ -741,7 +793,7 @@ func (r *repairGossipServer) processPropertySync( s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse], group string, ) bool { - updated, newer, err := syncShard.repair(ctx, sync.Id, sync.Property, sync.DeleteTime) + updated, newer, err := r.executeRepairWithBudget(ctx, syncShard, sync.Id, sync.Property, sync.DeleteTime, group) if err != nil { r.scheduler.l.Warn().Err(err).Msgf("failed to repair property %s from server side", sync.Id) r.scheduler.metrics.totalRepairFailedCount.Inc(1, group, fmt.Sprintf("%d", syncShard.id)) diff --git a/banyand/property/db/repair_gossip_test.go b/banyand/property/db/repair_gossip_test.go index 20f428f9f..536f57920 100644 --- a/banyand/property/db/repair_gossip_test.go +++ b/banyand/property/db/repair_gossip_test.go @@ -19,6 +19,7 @@ package db import ( "context" + "errors" "fmt" "net" "path" @@ -580,6 +581,49 @@ type nodeContext struct { stopMutex sync.RWMutex } +// inspectCounter exposes Inc deltas — BypassRegistry would swallow them. +type inspectCounter struct { + totals map[string]float64 + mu sync.RWMutex +} + +func newInspectCounter() *inspectCounter { + return &inspectCounter{totals: make(map[string]float64)} +} + +// Inc satisfies meter.Counter. +func (c *inspectCounter) Inc(delta float64, labelValues ...string) { + key := fmt.Sprintf("%v", labelValues) + c.mu.Lock() + c.totals[key] += delta + c.mu.Unlock() +} + +// Delete satisfies meter.Counter / meter.Histogram (both embed Instrument). +func (c *inspectCounter) Delete(labelValues ...string) bool { + key := fmt.Sprintf("%v", labelValues) + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.totals[key]; !ok { + return false + } + delete(c.totals, key) + return true +} + +// Observe satisfies meter.Histogram. We treat the call count itself as the +// observable signal — callers assert get(labels) == expected sample count. +func (c *inspectCounter) Observe(_ float64, labelValues ...string) { + c.Inc(1, labelValues...) +} + +func (c *inspectCounter) get(labelValues ...string) float64 { + key := fmt.Sprintf("%v", labelValues) + c.mu.RLock() + defer c.mu.RUnlock() + return c.totals[key] +} + func (n *nodeContext) appendStop(f func()) { n.stopMutex.Lock() defer n.stopMutex.Unlock() @@ -603,3 +647,297 @@ func nodeContextToParentSlice(ncs []*nodeContext) []node { } return nodes } + +// Shrinking the var must force timeout; counter assertions fail if the wrap is dropped. +// Only the server-side path (processPropertySync) is exercised here; the client-side +// wrap inside processDifferTreeSummary is currently covered by code review only. +func TestProcessPropertySyncRespectsPerPropertySearchTimeout(t *testing.T) { + prev := repairPerPropertyCtxBudget + repairPerPropertyCtxBudget = -time.Hour + t.Cleanup(func() { repairPerPropertyCtxBudget = prev }) + + var defers []func() + defer func() { + for _, f := range defers { + f() + } + }() + + dataDir, dataDeferFunc, err := test.NewSpace() + if err != nil { + t.Fatal(err) + } + defers = append(defers, dataDeferFunc) + snapshotDir, snapshotDeferFunc, err := test.NewSpace() + if err != nil { + t.Fatal(err) + } + defers = append(defers, snapshotDeferFunc) + + dbInstance, err := OpenDB(context.Background(), Config{ + Location: dataDir, + MetricsScopeName: "property_test_timeout_wiring", + FlushInterval: 3 * time.Second, + ExpireToDeleteDuration: 1 * time.Hour, + Repair: RepairConfig{ + Enabled: true, + Location: snapshotDir, + BuildTreeCron: "@every 10m", + QuickBuildTreeTime: time.Second * 10, + TreeSlotCount: 32, + }, + }, observability.BypassRegistry, fs.NewLocalFileSystem()) + if err != nil { + t.Fatal(err) + } + defers = append(defers, func() { _ = dbInstance.Close() }) + + db := dbInstance.(*database) + + // Install inspect-able counters so the test can assert the timeout + // branch incremented the metric — not merely that "some error" occurred. + timeoutCounter := newInspectCounter() + failedCounter := newInspectCounter() + successCounter := newInspectCounter() + db.repairScheduler.metrics.totalRepairPerPropertyTimeout = timeoutCounter + db.repairScheduler.metrics.totalRepairFailedCount = failedCounter + db.repairScheduler.metrics.totalRepairSuccessCount = successCounter + + property := generateProperty("test-id-timeout-wiring", time.Now().UnixNano(), 0) + if updateErr := db.Update(context.Background(), 0, GetPropertyID(property), property); updateErr != nil { + t.Fatal(updateErr) + } + + syncShard, loadErr := db.loadShard(context.Background(), testPropertyGroup, common.ShardID(0)) + if loadErr != nil { + t.Fatal(loadErr) + } + + // Sanity probe: a fresh ctx with the pathological timeout must already be Done. + probeCtx, probeCancel := context.WithTimeout(context.Background(), repairPerPropertyCtxBudget) + probeCancel() + if !errors.Is(probeCtx.Err(), context.DeadlineExceeded) { + t.Fatalf("expected probe ctx to be DeadlineExceeded, got %v", probeCtx.Err()) + } + + server := newRepairGossipServer(db.repairScheduler) + sync := &propertyv1.PropertySync{ + Id: GetPropertyID(property), + Property: property, + DeleteTime: 0, + } + + // typed-nil stream is safe: the err path in processPropertySync returns + // before touching it. If a future refactor reorders the code so the + // stream is used before the err check, this test would panic — that + // panic is the desired alarm. + var nilStream grpc.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse] + result := server.processPropertySync(context.Background(), syncShard, sync, nilStream, testPropertyGroup) + if result { + t.Fatalf("expected processPropertySync to return false when per-property search timeout fires, got true") + } + + // The timeout-specific branch must have incremented the per-property + // timeout counter. Without this assertion the test would pass for any + // error — including non-timeout regressions like an index corruption — + // and the Task 4 wiring would still appear "green". + shardLabel := fmt.Sprintf("%d", syncShard.id) + if got := timeoutCounter.get(testPropertyGroup, shardLabel); got != 1 { + t.Fatalf("expected totalRepairPerPropertyTimeout(%s,%s) == 1, got %v", + testPropertyGroup, shardLabel, got) + } + if got := failedCounter.get(testPropertyGroup, shardLabel); got != 1 { + t.Fatalf("expected totalRepairFailedCount(%s,%s) == 1, got %v", + testPropertyGroup, shardLabel, got) + } + // Protective 0 check: success must not move on a forced-timeout call. + // If a future refactor adds a retry-on-timeout path this assertion needs + // to relax, but the timeout/failed counters above must still hold. + if got := successCounter.get(testPropertyGroup, shardLabel); got != 0 { + t.Fatalf("expected totalRepairSuccessCount(%s,%s) == 0, got %v", + testPropertyGroup, shardLabel, got) + } +} + +// openTestRepairBase boots an in-process database and returns a fresh +// repairGossipBase whose metrics are replaced by inspect counters for the +// caller to assert on. Used by executeRepairWithBudget unit tests so they +// can exercise client+server-shared timeout/latency semantics in one place. +func openTestRepairBase(t *testing.T) ( + base *repairGossipBase, + timeoutCounter *inspectCounter, + successLatency *inspectCounter, + cleanup func(), +) { + t.Helper() + var defers []func() + cleanup = func() { + for _, f := range defers { + f() + } + } + + dataDir, dataDeferFunc, err := test.NewSpace() + if err != nil { + cleanup() + t.Fatal(err) + } + defers = append(defers, dataDeferFunc) + snapshotDir, snapshotDeferFunc, err := test.NewSpace() + if err != nil { + cleanup() + t.Fatal(err) + } + defers = append(defers, snapshotDeferFunc) + + dbInstance, err := OpenDB(context.Background(), Config{ + Location: dataDir, + MetricsScopeName: "property_test_helper", + FlushInterval: 3 * time.Second, + ExpireToDeleteDuration: 1 * time.Hour, + Repair: RepairConfig{ + Enabled: true, + Location: snapshotDir, + BuildTreeCron: "@every 10m", + QuickBuildTreeTime: time.Second * 10, + TreeSlotCount: 32, + }, + }, observability.BypassRegistry, fs.NewLocalFileSystem()) + if err != nil { + cleanup() + t.Fatal(err) + } + defers = append(defers, func() { _ = dbInstance.Close() }) + + db := dbInstance.(*database) + timeoutCounter = newInspectCounter() + successLatency = newInspectCounter() + db.repairScheduler.metrics.totalRepairPerPropertyTimeout = timeoutCounter + db.repairScheduler.metrics.repairSuccessLatency = successLatency + base = &repairGossipBase{scheduler: db.repairScheduler} + return base, timeoutCounter, successLatency, cleanup +} + +// TestExecuteRepairWithBudget covers the per-property repair helper that both +// client (processDifferTreeSummary) and server (processPropertySync) call +// into. Each subtest pins one observability invariant; deleting the matching +// branch in executeRepairWithBudget makes the corresponding assertion fail. +func TestExecuteRepairWithBudget(t *testing.T) { + shardLabel := "0" + + t.Run("child budget fires under healthy parent ctx → timeout counter +1", func(t *testing.T) { + prev := repairPerPropertyCtxBudget + repairPerPropertyCtxBudget = -time.Hour + t.Cleanup(func() { repairPerPropertyCtxBudget = prev }) + + base, timeoutCounter, successLatency, cleanup := openTestRepairBase(t) + defer cleanup() + + syncShard, err := base.scheduler.db.loadShard(context.Background(), testPropertyGroup, common.ShardID(0)) + if err != nil { + t.Fatal(err) + } + property := generateProperty("test-id-budget", time.Now().UnixNano(), 0) + + _, _, repairErr := base.executeRepairWithBudget(context.Background(), syncShard, + GetPropertyID(property), property, 0, testPropertyGroup) + if repairErr == nil { + t.Fatalf("expected err under negative budget, got nil") + } + if !errors.Is(repairErr, context.DeadlineExceeded) { + t.Fatalf("expected wrapped DeadlineExceeded, got %v", repairErr) + } + if got := timeoutCounter.get(testPropertyGroup, shardLabel); got != 1 { + t.Fatalf("expected per-property timeout counter == 1, got %v", got) + } + if got := successLatency.get(testPropertyGroup, shardLabel); got != 0 { + t.Fatalf("expected success latency not sampled on err, got %v", got) + } + }) + + t.Run("parent ctx already done → timeout counter not incremented (parent-not-child)", func(t *testing.T) { + base, timeoutCounter, _, cleanup := openTestRepairBase(t) + defer cleanup() + + syncShard, err := base.scheduler.db.loadShard(context.Background(), testPropertyGroup, common.ShardID(0)) + if err != nil { + t.Fatal(err) + } + property := generateProperty("test-id-parent", time.Now().UnixNano(), 0) + + // Parent ctx already expired: DeadlineExceeded reaches the helper + // but it is NOT from the per-property budget — must not be counted. + parentCtx, parentCancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour)) + defer parentCancel() + + _, _, repairErr := base.executeRepairWithBudget(parentCtx, syncShard, + GetPropertyID(property), property, 0, testPropertyGroup) + if repairErr == nil { + t.Fatalf("expected err under already-expired parent ctx, got nil") + } + if got := timeoutCounter.get(testPropertyGroup, shardLabel); got != 0 { + t.Fatalf("parent-ctx expiry must not count as per-property timeout, got %v", got) + } + }) + + t.Run("success path with updated=true → latency sampled once", func(t *testing.T) { + base, timeoutCounter, successLatency, cleanup := openTestRepairBase(t) + defer cleanup() + + syncShard, err := base.scheduler.db.loadShard(context.Background(), testPropertyGroup, common.ShardID(0)) + if err != nil { + t.Fatal(err) + } + // Empty db → shard.repair takes the "no older properties" branch, + // updateDocuments runs, and returns updated == true. + property := generateProperty("test-id-success", time.Now().UnixNano(), 0) + + updated, _, repairErr := base.executeRepairWithBudget(context.Background(), syncShard, + GetPropertyID(property), property, 0, testPropertyGroup) + if repairErr != nil { + t.Fatalf("expected nil err on healthy repair, got %v", repairErr) + } + if !updated { + t.Fatalf("expected updated == true on empty-db first repair, got false") + } + if got := successLatency.get(testPropertyGroup, shardLabel); got != 1 { + t.Fatalf("expected success latency sample count == 1, got %v", got) + } + if got := timeoutCounter.get(testPropertyGroup, shardLabel); got != 0 { + t.Fatalf("timeout counter must not move on success, got %v", got) + } + }) + + t.Run("no-op path with updated=false → latency still sampled (every err == nil counts)", func(t *testing.T) { + base, _, successLatency, cleanup := openTestRepairBase(t) + defer cleanup() + + now := time.Now().UnixNano() + // Seed a newer revision so the repair finds an older-or-equal "self" + // already present and returns updated == false. + existing := generateProperty("test-id-noop", now, 0) + if err := base.scheduler.db.Update(context.Background(), 0, GetPropertyID(existing), existing); err != nil { + t.Fatal(err) + } + syncShard, err := base.scheduler.db.loadShard(context.Background(), testPropertyGroup, common.ShardID(0)) + if err != nil { + t.Fatal(err) + } + // Re-repair with an older revision: helper returns nil err but + // updated == false. The success-latency histogram still samples + // because a no-op decision is still a successful sync cycle whose + // ctx-aware cost operators care about. + olderOrEqual := generateProperty("test-id-noop", now-1, 0) + updated, _, repairErr := base.executeRepairWithBudget(context.Background(), syncShard, + GetPropertyID(olderOrEqual), olderOrEqual, 0, testPropertyGroup) + if repairErr != nil { + t.Fatalf("expected nil err on no-op repair, got %v", repairErr) + } + if updated { + t.Fatalf("expected updated == false on older-or-equal repair, got true") + } + if got := successLatency.get(testPropertyGroup, shardLabel); got != 1 { + t.Fatalf("success latency must be sampled once even when updated == false (err == nil semantics), got %v", got) + } + }) +} diff --git a/banyand/property/db/shard.go b/banyand/property/db/shard.go index 6e8410eac..3d8915d9a 100644 --- a/banyand/property/db/shard.go +++ b/banyand/property/db/shard.go @@ -357,6 +357,37 @@ func (s *shard) search(ctx context.Context, q index.Query, orderBy *propertyv1.Q } func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Property, deleteTime int64) (updated bool, selfNewer *queryProperty, err error) { + start := time.Now() + var ( + search1Elapsed time.Duration + deletePhaseElapsed time.Duration + updateElapsed time.Duration + olderCount int + deleteCount int + ) + defer func() { + elapsed := time.Since(start) + switch { + case err != nil: + s.l.Warn().Int64("elapsed_ms", elapsed.Milliseconds()).Str("group", property.Metadata.Group). + Str("name", property.Metadata.Name).Str("id", property.Id).Err(err). + Int64("search1_ms", search1Elapsed.Milliseconds()). + Int64("delete_phase_ms", deletePhaseElapsed.Milliseconds()). + Int64("update_ms", updateElapsed.Milliseconds()). + Int("older_count", olderCount). + Int("delete_count", deleteCount). + Msg("property repair failed") + case elapsed >= 5*time.Second: + s.l.Warn().Int64("elapsed_ms", elapsed.Milliseconds()).Str("group", property.Metadata.Group). + Str("name", property.Metadata.Name).Str("id", property.Id). + Int64("search1_ms", search1Elapsed.Milliseconds()). + Int64("delete_phase_ms", deletePhaseElapsed.Milliseconds()). + Int64("update_ms", updateElapsed.Milliseconds()). + Int("older_count", olderCount). + Int("delete_count", deleteCount). + Msg("slow property repair") + } + }() iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{ Groups: []string{property.Metadata.Group}, Name: property.Metadata.Name, @@ -365,10 +396,13 @@ func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Prop if err != nil { return false, nil, fmt.Errorf("build property query failure: %w", err) } + search1Start := time.Now() olderProperties, err := s.search(ctx, iq, nil, 100) + search1Elapsed = time.Since(search1Start) if err != nil { return false, nil, fmt.Errorf("query older properties failed: %w", err) } + olderCount = len(olderProperties) sort.Sort(queryPropertySlice(olderProperties)) // if there no older properties, we can insert the latest document. if len(olderProperties) == 0 { @@ -377,7 +411,9 @@ func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Prop if err != nil { return false, nil, fmt.Errorf("build update document failed: %w", err) } + updateStart := time.Now() err = s.updateDocuments(index.Documents{*doc}) + updateElapsed = time.Since(updateStart) if err != nil { return false, nil, fmt.Errorf("update document failed: %w", err) } @@ -393,10 +429,13 @@ func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Prop } docIDList := s.buildNotDeletedDocIDList(olderProperties) + deletePhaseStart := time.Now() deletedDocuments, err := s.buildDeleteFromTimeDocuments(ctx, docIDList, time.Now().UnixNano()) + deletePhaseElapsed = time.Since(deletePhaseStart) if err != nil { return false, nil, fmt.Errorf("build delete older documents failed: %w", err) } + deleteCount = len(deletedDocuments) // update the property to mark it as delete updateDoc, err := s.buildUpdateDocument(id, property, deleteTime) if err != nil { @@ -405,7 +444,9 @@ func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Prop result := make([]index.Document, 0, len(deletedDocuments)+1) result = append(result, deletedDocuments...) result = append(result, *updateDoc) + updateStart := time.Now() err = s.updateDocuments(result) + updateElapsed = time.Since(updateStart) if err != nil { return false, nil, fmt.Errorf("update documents failed: %w", err) } diff --git a/banyand/property/gossip/server.go b/banyand/property/gossip/server.go index cff25034f..1a6b8c2a8 100644 --- a/banyand/property/gossip/server.go +++ b/banyand/property/gossip/server.go @@ -86,7 +86,7 @@ func (s *service) getServiceRegisters() []func(server *grpc.Server) { type groupWithShardPropagation struct { latestTime time.Time - channel chan *handlingRequest + pending *handlingRequest originalNodeID string } @@ -117,16 +117,24 @@ func (q *protocolHandler) processPropagation() { for { select { case <-q.groupNotify: - request := q.findUnProcessRequest() - if request == nil { - continue - } - timeoutCtx, cancelFunc := context.WithTimeout(q.s.closer.Ctx(), perNodeSyncTimeout) - err := q.handle(timeoutCtx, request) - cancelFunc() - if err != nil { - q.s.log.Warn().Err(err).Stringer("request", request.PropagationRequest). - Msgf("handle propagation request failure") + // Drain every pending per wake-up; re-check CloseNotify between handles. + for { + select { + case <-q.s.closer.CloseNotify(): + return + default: + } + request := q.findUnProcessRequest() + if request == nil { + break + } + timeoutCtx, cancelFunc := context.WithTimeout(q.s.closer.Ctx(), perNodeSyncTimeout) + err := q.handle(timeoutCtx, request) + cancelFunc() + if err != nil { + q.s.log.Warn().Err(err).Stringer("request", request.PropagationRequest). + Msgf("handle propagation request failure") + } } case <-q.s.closer.CloseNotify(): return @@ -138,11 +146,10 @@ func (q *protocolHandler) findUnProcessRequest() *handlingRequest { q.mu.Lock() defer q.mu.Unlock() for _, g := range q.groupWithShards { - select { - case d := <-g.channel: - return d - default: - continue + if g.pending != nil { + req := g.pending + g.pending = nil + return req } } return nil @@ -302,6 +309,8 @@ func (q *protocolHandler) contextIsDone(ctx context.Context) bool { } } +// addToProcess enqueues the request with latest-wins coalesce on pending; +// returns false when a different originator arrives within TTL. func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest, tracer Trace, span Span) bool { q.mu.Lock() defer q.mu.Unlock() @@ -315,11 +324,10 @@ func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest, t } if !exist { groupShard = &groupWithShardPropagation{ - channel: make(chan *handlingRequest, 1), + pending: handlingRequestData, originalNodeID: request.Context.OriginNode, latestTime: time.Now(), } - groupShard.channel <- handlingRequestData q.groupWithShards[shardKey] = groupShard q.notifyNewRequest() return true @@ -328,23 +336,31 @@ func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest, t // if the latest round is out of ttl, then needs to change to current node to executing if time.Since(groupShard.latestTime) > q.s.scheduleInterval/2 { groupShard.originalNodeID = request.Context.OriginNode - select { - case groupShard.channel <- handlingRequestData: - q.notifyNewRequest() - default: - q.s.log.Error().Msgf("ready to added propagation into group shard %s(%d) in a new round, but it's full", request.Group, request.ShardId) + if groupShard.pending != nil { + q.s.serverMetrics.totalCoalesced.Inc(1, request.Group) + q.s.log.Info(). + Str("group", request.Group). + Uint32("shardID", request.ShardId). + Str("originNode", request.Context.OriginNode). + Msg("propagation request coalesced into pending (TTL takeover)") } + groupShard.pending = handlingRequestData + q.notifyNewRequest() return true } // if the original node ID are a same node, means which from the same round if groupShard.originalNodeID == request.Context.OriginNode { - select { - case groupShard.channel <- handlingRequestData: - q.notifyNewRequest() - default: - q.s.log.Error().Msgf("ready to added propagation into group shard %s(%d) in a same round, but it's full", request.Group, request.ShardId) + if groupShard.pending != nil { + q.s.serverMetrics.totalCoalesced.Inc(1, request.Group) + q.s.log.Info(). + Str("group", request.Group). + Uint32("shardID", request.ShardId). + Str("originNode", request.Context.OriginNode). + Msg("propagation request coalesced into pending (same-round latest-wins)") } + groupShard.pending = handlingRequestData + q.notifyNewRequest() return true } @@ -419,6 +435,7 @@ type serverMetrics struct { totalReceived meter.Counter totalAddProcessed meter.Counter totalSkipProcess meter.Counter + totalCoalesced meter.Counter totalStarted meter.Counter totalFinished meter.Counter @@ -439,6 +456,7 @@ func newServerMetrics(factory observability.Factory) *serverMetrics { totalReceived: factory.NewCounter("total_received", "group"), totalAddProcessed: factory.NewCounter("total_add_processed", "group"), totalSkipProcess: factory.NewCounter("total_skip_process", "group"), + totalCoalesced: factory.NewCounter("total_coalesced", "group"), totalStarted: factory.NewCounter("total_started", "group"), totalFinished: factory.NewCounter("total_finished", "group"), diff --git a/banyand/property/gossip/service_test.go b/banyand/property/gossip/service_test.go index dc747e9a8..cfcacebeb 100644 --- a/banyand/property/gossip/service_test.go +++ b/banyand/property/gossip/service_test.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "net" + "strings" "sync" "testing" "time" @@ -142,6 +143,268 @@ var _ = ginkgo.Describe("Propagation Messenger", func() { nodeListenFromVerify(node2, []string{node1.nodeID}) nodeListenFromVerify(node3, []string{node1.nodeID}) }) + + // Latest scheduled round must reach Rev when the worker is busy. + ginkgo.It("does not silently drop scheduled rounds when handler is slow", func() { + nodes = startNodes(3) + svc := nodes[0].messenger.(*service) + + coalesced := newRecordingCounter() + svc.serverMetrics.totalCoalesced = coalesced + + blocking := newBlockingListener() + svc.listenersLock.Lock() + svc.listeners = []MessageListener{blocking} + svc.listenersLock.Unlock() + + nodeList := []string{nodes[0].nodeID, nodes[1].nodeID, nodes[2].nodeID} + const totalRounds = 5 + buildRequest := func(roundIndex int32) *propertyv1.PropagationRequest { + return &propertyv1.PropagationRequest{ + Context: &propertyv1.PropagationContext{ + Nodes: nodeList, + OriginNode: nodes[0].nodeID, + MaxPropagationCount: int32(len(nodeList)*2 - 3), + CurrentPropagationCount: roundIndex, + }, + Group: mockGroup, + ShardId: 0, + } + } + + _, sendErr := svc.protocolHandler.Propagation(context.Background(), buildRequest(0)) + gomega.Expect(sendErr).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + select { + case <-blocking.entered: + return true + default: + return false + } + }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue(), + "worker should have drained the first request and entered listener.Rev") + + _, sendErr = svc.protocolHandler.Propagation(context.Background(), buildRequest(1)) + gomega.Expect(sendErr).NotTo(gomega.HaveOccurred()) + for i := int32(2); i < int32(totalRounds); i++ { + _, sendErr = svc.protocolHandler.Propagation(context.Background(), buildRequest(i)) + gomega.Expect(sendErr).NotTo(gomega.HaveOccurred()) + } + gomega.Expect(coalesced.get(mockGroup)).To(gomega.Equal(3.0), + "3 same-round overwrites of a non-nil pending must each increment totalCoalesced") + + close(blocking.release) + gomega.Eventually(func() int32 { + return blocking.maxObservedCount() + }, flags.EventuallyTimeout, "100ms").Should(gomega.Equal(int32(4)), + "latest scheduled round must reach Rev under latest-wins semantics") + gomega.Expect(blocking.count()).To(gomega.BeNumerically(">=", 2), + "at least the in-flight round and the coalesced latest round should reach Rev") + }) + + // TTL takeover wires the new originator into pending when it is empty. + ginkgo.It("TTL-expired branch takes over to new originator with empty pending", func() { + nodes = startNodes(2) + svc := nodes[0].messenger.(*service) + svc.scheduleInterval = 50 * time.Millisecond + + blocking := newBlockingListener() + svc.listenersLock.Lock() + svc.listeners = []MessageListener{blocking} + svc.listenersLock.Unlock() + + nodeList := []string{nodes[0].nodeID, nodes[1].nodeID} + buildReq := func(origin string, idx int32) *propertyv1.PropagationRequest { + return &propertyv1.PropagationRequest{ + Context: &propertyv1.PropagationContext{ + Nodes: nodeList, + OriginNode: origin, + MaxPropagationCount: 1, + CurrentPropagationCount: idx, + }, + Group: mockGroup, + ShardId: 0, + } + } + + _, err := svc.protocolHandler.Propagation(context.Background(), buildReq(nodes[0].nodeID, 0)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + select { + case <-blocking.entered: + return true + default: + return false + } + }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue()) + + time.Sleep(250 * time.Millisecond) + + _, err = svc.protocolHandler.Propagation(context.Background(), buildReq(nodes[1].nodeID, 99)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + close(blocking.release) + gomega.Eventually(func() int32 { + return blocking.maxObservedCount() + }, flags.EventuallyTimeout, "100ms").Should(gomega.Equal(int32(99)), + "round-B must reach Rev — confirms the TTL-expired branch ran and wired pending correctly") + }) + + // TTL takeover overwrites a non-nil pending and bumps totalCoalesced. + ginkgo.It("TTL-expired branch coalesces when pending is non-nil", func() { + nodes = startNodes(2) + svc := nodes[0].messenger.(*service) + svc.scheduleInterval = 50 * time.Millisecond + coalesced := newRecordingCounter() + svc.serverMetrics.totalCoalesced = coalesced + + blocking := newBlockingListener() + svc.listenersLock.Lock() + svc.listeners = []MessageListener{blocking} + svc.listenersLock.Unlock() + + nodeList := []string{nodes[0].nodeID, nodes[1].nodeID} + buildReq := func(origin string, idx int32) *propertyv1.PropagationRequest { + return &propertyv1.PropagationRequest{ + Context: &propertyv1.PropagationContext{ + Nodes: nodeList, + OriginNode: origin, + MaxPropagationCount: 1, + CurrentPropagationCount: idx, + }, + Group: mockGroup, + ShardId: 0, + } + } + + _, err := svc.protocolHandler.Propagation(context.Background(), buildReq(nodes[0].nodeID, 0)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + select { + case <-blocking.entered: + return true + default: + return false + } + }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue()) + + _, err = svc.protocolHandler.Propagation(context.Background(), buildReq(nodes[0].nodeID, 1)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(coalesced.get(mockGroup)).To(gomega.Equal(0.0), + "first same-origin round writes pending without coalescing") + + time.Sleep(250 * time.Millisecond) + + _, err = svc.protocolHandler.Propagation(context.Background(), buildReq(nodes[1].nodeID, 99)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(coalesced.get(mockGroup)).To(gomega.Equal(1.0), + "TTL-expired branch must increment totalCoalesced when overwriting a non-nil pending") + + close(blocking.release) + gomega.Eventually(func() int32 { + return blocking.maxObservedCount() + }, flags.EventuallyTimeout, "100ms").Should(gomega.Equal(int32(99)), + "round-B (the latest take-over) must reach Rev") + }) + + // drain-all flushes every queued pending even when groupNotify drops signals. + ginkgo.It("does not strand pending entries when groupNotify is saturated", func() { + nodes = startNodes(2) + svc := nodes[0].messenger.(*service) + + blocking := newBlockingListener() + svc.listenersLock.Lock() + svc.listeners = []MessageListener{blocking} + svc.listenersLock.Unlock() + + nodeList := []string{nodes[0].nodeID, nodes[1].nodeID} + buildReq := func(shardID uint32, idx int32) *propertyv1.PropagationRequest { + return &propertyv1.PropagationRequest{ + Context: &propertyv1.PropagationContext{ + Nodes: nodeList, + OriginNode: nodes[0].nodeID, + MaxPropagationCount: 1, + CurrentPropagationCount: idx, + }, + Group: mockGroup, + ShardId: shardID, + } + } + + _, err := svc.protocolHandler.Propagation(context.Background(), buildReq(0, 0)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + select { + case <-blocking.entered: + return true + default: + return false + } + }, flags.EventuallyTimeout, "10ms").Should(gomega.BeTrue(), + "worker should drain the initial request and enter listener.Rev") + + const extraShards uint32 = 12 + for i := uint32(1); i <= extraShards; i++ { + _, err = svc.protocolHandler.Propagation(context.Background(), buildReq(i, int32(i))) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + + close(blocking.release) + gomega.Eventually(func() int { + return blocking.count() + }, flags.EventuallyTimeout, "100ms").Should(gomega.Equal(int(extraShards)+1), + "drain-all on each wake-up must flush every pending shard even when notifyNewRequest dropped some signals") + }) + + // CloseNotify must abort the drain loop between handles. + ginkgo.It("worker honors CloseNotify in drain loop even with backlog", func() { + nodes = startNodes(2) + // Slow listener so each handle takes a measurable amount of time. + nodes[0].listener.delay = 200 * time.Millisecond + + svc := nodes[0].messenger.(*service) + listener := nodes[0].listener + nodeList := []string{nodes[0].nodeID, nodes[1].nodeID} + const backlogShards uint32 = 10 + + for i := uint32(0); i < backlogShards; i++ { + req := &propertyv1.PropagationRequest{ + Context: &propertyv1.PropagationContext{ + Nodes: nodeList, + OriginNode: nodes[0].nodeID, + MaxPropagationCount: 1, + }, + Group: mockGroup, + ShardId: i, + } + _, err := svc.protocolHandler.Propagation(context.Background(), req) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + + // Give the worker a moment to start the first handle so the listener + // delay is in flight when we trigger close. + time.Sleep(250 * time.Millisecond) + + listener.mu.RLock() + countAtClose := len(listener.messages) + listener.mu.RUnlock() + + // Trigger close. Without the close check in the drain loop, the worker + // would keep processing every queued shard before exiting. + svc.GracefulStop() + nodes[0] = nil // suite-level AfterEach must not double-stop + + // Wait long enough that an unfixed worker would have drained the full + // backlog: backlogShards * 200ms = 2s. Use 2.5s safety margin. + time.Sleep(2500 * time.Millisecond) + + listener.mu.RLock() + extraHandles := len(listener.messages) - countAtClose + listener.mu.RUnlock() + + gomega.Expect(extraHandles).To(gomega.BeNumerically("<=", 2), + "worker should exit within ~one in-flight handle after CloseNotify; saw %d extra handles after close (full backlog would be ~%d)", + extraHandles, int(backlogShards)) + }) }) func nodeVerify(n *nodeContext, targets []string, messagesCount int) { @@ -272,3 +535,84 @@ func (m *mockListener) Rev(_ context.Context, _ Trace, nextNode *grpc.ClientConn m.fromNodes = append(m.fromNodes, req.Context.OriginNode) return nil } + +// blockingListener.Rev blocks on release; propCounts records which rounds reached Rev. +type blockingListener struct { + release chan struct{} + entered chan struct{} + propCounts []int32 + enteredOnce sync.Once + mu sync.RWMutex +} + +func newBlockingListener() *blockingListener { + return &blockingListener{ + release: make(chan struct{}), + entered: make(chan struct{}), + } +} + +func (b *blockingListener) Rev(_ context.Context, _ Trace, _ *grpc.ClientConn, req *propertyv1.PropagationRequest) error { + b.enteredOnce.Do(func() { close(b.entered) }) + <-b.release + b.mu.Lock() + b.propCounts = append(b.propCounts, req.Context.CurrentPropagationCount) + b.mu.Unlock() + return nil +} + +func (b *blockingListener) count() int { + b.mu.RLock() + defer b.mu.RUnlock() + return len(b.propCounts) +} + +func (b *blockingListener) maxObservedCount() int32 { + b.mu.RLock() + defer b.mu.RUnlock() + if len(b.propCounts) == 0 { + return 0 + } + maxCount := b.propCounts[0] + for _, c := range b.propCounts[1:] { + if c > maxCount { + maxCount = c + } + } + return maxCount +} + +// recordingCounter exposes Inc deltas — the bypass registry cannot be introspected. +type recordingCounter struct { + totals map[string]float64 + mu sync.RWMutex +} + +func newRecordingCounter() *recordingCounter { + return &recordingCounter{totals: make(map[string]float64)} +} + +func (c *recordingCounter) Inc(delta float64, labelValues ...string) { + key := strings.Join(labelValues, "\x00") + c.mu.Lock() + c.totals[key] += delta + c.mu.Unlock() +} + +func (c *recordingCounter) Delete(labelValues ...string) bool { + key := strings.Join(labelValues, "\x00") + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.totals[key]; !ok { + return false + } + delete(c.totals, key) + return true +} + +func (c *recordingCounter) get(labelValues ...string) float64 { + key := strings.Join(labelValues, "\x00") + c.mu.RLock() + defer c.mu.RUnlock() + return c.totals[key] +} diff --git a/test/property_repair/full_data/integrated_test.go b/test/property_repair/full_data/integrated_test.go index 765180c77..f07c6cfa2 100644 --- a/test/property_repair/full_data/integrated_test.go +++ b/test/property_repair/full_data/integrated_test.go @@ -168,8 +168,11 @@ var _ = ginkgo.Describe("Property Repair Full Data Test", ginkgo.Ordered, func() gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(), fmt.Sprintf("Node %s should be healthy before verification: %s", metrics.NodeName, metrics.ErrorMessage)) - fmt.Printf("- %s: total_propagation_count=%d, repair_success_count=%d\n", - metrics.NodeName, metrics.TotalPropagationCount, metrics.RepairSuccessCount) + fmt.Printf("- %s: total_propagation_count=%d, repair_success_count=%d, repair_failure_count=%d, "+ + "per_property_timeout=%d, total_coalesced=%d, success_latency_count=%d, success_latency_sum_sec=%.4f\n", + metrics.NodeName, metrics.TotalPropagationCount, metrics.RepairSuccessCount, + metrics.RepairFailureCount, metrics.RepairPerPropertyTimeoutCount, metrics.TotalCoalesced, + metrics.RepairSuccessLatencyCount, metrics.RepairSuccessLatencySumSeconds) } fmt.Println("\n=== Triggering property repair by waiting for scheduled repair cycle ===") diff --git a/test/property_repair/half_data/integrated_test.go b/test/property_repair/half_data/integrated_test.go index bd40fa682..ff66a1937 100644 --- a/test/property_repair/half_data/integrated_test.go +++ b/test/property_repair/half_data/integrated_test.go @@ -195,8 +195,11 @@ var _ = ginkgo.Describe("Property Repair Half Data Test", ginkgo.Ordered, func() gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(), fmt.Sprintf("Node %s should be healthy before verification: %s", metrics.NodeName, metrics.ErrorMessage)) - fmt.Printf("- %s: total_propagation_count=%d, repair_success_count=%d\n", - metrics.NodeName, metrics.TotalPropagationCount, metrics.RepairSuccessCount) + fmt.Printf("- %s: total_propagation_count=%d, repair_success_count=%d, repair_failure_count=%d, "+ + "per_property_timeout=%d, total_coalesced=%d, success_latency_count=%d, success_latency_sum_sec=%.4f\n", + metrics.NodeName, metrics.TotalPropagationCount, metrics.RepairSuccessCount, + metrics.RepairFailureCount, metrics.RepairPerPropertyTimeoutCount, metrics.TotalCoalesced, + metrics.RepairSuccessLatencyCount, metrics.RepairSuccessLatencySumSeconds) } fmt.Println("\n=== Triggering property repair by waiting for scheduled repair cycle ===") diff --git a/test/property_repair/same_data/integrated_test.go b/test/property_repair/same_data/integrated_test.go index 290aaf310..f7f8b9ed9 100644 --- a/test/property_repair/same_data/integrated_test.go +++ b/test/property_repair/same_data/integrated_test.go @@ -153,8 +153,11 @@ var _ = ginkgo.Describe("Property Repair Same Data Test", ginkgo.Ordered, func() gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(), fmt.Sprintf("Node %s should be healthy before verification: %s", metrics.NodeName, metrics.ErrorMessage)) - fmt.Printf("- %s: total_propagation_count=%d, repair_success_count=%d\n", - metrics.NodeName, metrics.TotalPropagationCount, metrics.RepairSuccessCount) + fmt.Printf("- %s: total_propagation_count=%d, repair_success_count=%d, repair_failure_count=%d, "+ + "per_property_timeout=%d, total_coalesced=%d, success_latency_count=%d, success_latency_sum_sec=%.4f\n", + metrics.NodeName, metrics.TotalPropagationCount, metrics.RepairSuccessCount, + metrics.RepairFailureCount, metrics.RepairPerPropertyTimeoutCount, metrics.TotalCoalesced, + metrics.RepairSuccessLatencyCount, metrics.RepairSuccessLatencySumSeconds) } fmt.Println("\n=== Triggering property repair by waiting for scheduled repair cycle ===") diff --git a/test/property_repair/shared_utils.go b/test/property_repair/shared_utils.go index ee926d305..2d3fb3591 100644 --- a/test/property_repair/shared_utils.go +++ b/test/property_repair/shared_utils.go @@ -53,6 +53,11 @@ const ( PropertyName = "perf-test-property" ) +// promFloatPattern matches a Prometheus exposition value: scientific +// notation (e.g. 1.23e+06), plain decimal, [+-]Inf or NaN. The previous +// `\d+(?:\.\d+)?` form silently missed large counters and histogram sums. +const promFloatPattern = `(-?(?:\d+(?:\.\d+)?(?:[eE][+-]?\d+)?|[+-]?Inf|NaN))` + // PrometheusEndpoints defines the prometheus endpoints for data nodes. var PrometheusEndpoints = []string{ "http://localhost:2122/metrics", // data-node-1 @@ -62,12 +67,22 @@ var PrometheusEndpoints = []string{ // NodeMetrics represents the metrics for a data node. type NodeMetrics struct { - LastScrapeTime time.Time - NodeName string - ErrorMessage string - TotalPropagationCount int64 - RepairSuccessCount int64 - IsHealthy bool + LastScrapeTime time.Time + NodeName string + ErrorMessage string + TotalPropagationCount int64 + RepairSuccessCount int64 + RepairFailureCount int64 + RepairPerPropertyTimeoutCount int64 + TotalCoalesced int64 + // RepairSuccessLatencyCount / SumSeconds come from the success-path + // histogram: every err == nil call is sampled (including no-op cases + // where the remote was already newer). Failures and per-property + // timeouts are NOT sampled, so a round where every repair times out + // shows count=0 here. + RepairSuccessLatencyCount int64 + RepairSuccessLatencySumSeconds float64 + IsHealthy bool } // GenerateLargeData creates a string of specified size filled with random characters. @@ -287,9 +302,19 @@ func GetNodeMetrics(endpoint string, nodeIndex int) *NodeMetrics { content := string(body) totalPropagationCount := parseTotalPropagationCount(content) repairSuccessCount := parseRepairSuccessCount(content) + repairFailureCount := parseRepairFailureCount(content) + repairPerPropertyTimeoutCount := parseRepairPerPropertyTimeoutCount(content) + totalCoalesced := parseTotalCoalesced(content) + repairSuccessLatencyCount := parseRepairSuccessLatencyCount(content) + repairSuccessLatencySumSeconds := parseRepairSuccessLatencySum(content) metrics.TotalPropagationCount = totalPropagationCount metrics.RepairSuccessCount = repairSuccessCount + metrics.RepairFailureCount = repairFailureCount + metrics.RepairPerPropertyTimeoutCount = repairPerPropertyTimeoutCount + metrics.TotalCoalesced = totalCoalesced + metrics.RepairSuccessLatencyCount = repairSuccessLatencyCount + metrics.RepairSuccessLatencySumSeconds = repairSuccessLatencySumSeconds metrics.IsHealthy = true return metrics } @@ -297,7 +322,7 @@ func GetNodeMetrics(endpoint string, nodeIndex int) *NodeMetrics { // parseTotalPropagationCount parses the total_propagation_count from prometheus metrics text. func parseTotalPropagationCount(content string) int64 { // Look for metric lines like: banyandb_property_repair_gossip_server_total_propagation_count{group="perf-test-group",original_node="data-node-1:17912"} 3 - re := regexp.MustCompile(`banyandb_property_repair_gossip_server_total_propagation_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`) + re := regexp.MustCompile(`banyandb_property_repair_gossip_server_total_propagation_count\{[^}]+\}\s+` + promFloatPattern) matches := re.FindAllStringSubmatch(content, -1) var totalCount int64 @@ -317,7 +342,47 @@ func parseTotalPropagationCount(content string) int64 { // parseRepairSuccessCount parses the repair_success_count from prometheus metrics text. func parseRepairSuccessCount(content string) int64 { // Look for metric lines like: banyandb_property_scheduler_property_repair_success_count{group="perf-test-group",shard="0"} 100 - re := regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`) + re := regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_count\{[^}]+\}\s+` + promFloatPattern) + matches := re.FindAllStringSubmatch(content, -1) + + var totalCount int64 + for _, match := range matches { + if len(match) >= 2 { + value, err := strconv.ParseFloat(match[1], 64) + if err != nil { + continue + } + totalCount += int64(value) + } + } + + return totalCount +} + +// parseRepairFailureCount parses the repair_failure_count from prometheus metrics text. +func parseRepairFailureCount(content string) int64 { + // Look for metric lines like: banyandb_property_scheduler_property_repair_failure_count{group="perf-test-group",shard="0"} 5 + re := regexp.MustCompile(`banyandb_property_scheduler_property_repair_failure_count\{[^}]+\}\s+` + promFloatPattern) + matches := re.FindAllStringSubmatch(content, -1) + + var totalCount int64 + for _, match := range matches { + if len(match) >= 2 { + value, err := strconv.ParseFloat(match[1], 64) + if err != nil { + continue + } + totalCount += int64(value) + } + } + + return totalCount +} + +// parseRepairPerPropertyTimeoutCount parses the property_repair_per_property_timeout from prometheus metrics text. +func parseRepairPerPropertyTimeoutCount(content string) int64 { + // Look for metric lines like: banyandb_property_scheduler_property_repair_per_property_timeout{group="perf-test-group",shard="0"} 3 + re := regexp.MustCompile(`banyandb_property_scheduler_property_repair_per_property_timeout\{[^}]+\}\s+` + promFloatPattern) matches := re.FindAllStringSubmatch(content, -1) var totalCount int64 @@ -334,6 +399,66 @@ func parseRepairSuccessCount(content string) int64 { return totalCount } +// parseTotalCoalesced parses the gossip total_coalesced from prometheus metrics text. +func parseTotalCoalesced(content string) int64 { + // Look for metric lines like: banyandb_property_repair_gossip_server_total_coalesced{group="perf-test-group"} 7 + re := regexp.MustCompile(`banyandb_property_repair_gossip_server_total_coalesced\{[^}]+\}\s+` + promFloatPattern) + matches := re.FindAllStringSubmatch(content, -1) + + var totalCount int64 + for _, match := range matches { + if len(match) >= 2 { + value, err := strconv.ParseFloat(match[1], 64) + if err != nil { + continue + } + totalCount += int64(value) + } + } + + return totalCount +} + +// parseRepairSuccessLatencyCount parses the histogram _count of property_repair_success_latency_seconds. +func parseRepairSuccessLatencyCount(content string) int64 { + // Look for metric lines like: banyandb_property_scheduler_property_repair_success_latency_seconds_count{group="perf-test-group",shard="0"} 100 + re := regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_latency_seconds_count\{[^}]+\}\s+` + promFloatPattern) + matches := re.FindAllStringSubmatch(content, -1) + + var totalCount int64 + for _, match := range matches { + if len(match) >= 2 { + value, err := strconv.ParseFloat(match[1], 64) + if err != nil { + continue + } + totalCount += int64(value) + } + } + + return totalCount +} + +// parseRepairSuccessLatencySum parses the histogram _sum (seconds) of property_repair_success_latency_seconds. +func parseRepairSuccessLatencySum(content string) float64 { + // Look for metric lines like: banyandb_property_scheduler_property_repair_success_latency_seconds_sum{group="perf-test-group",shard="0"} 12.34 + re := regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_latency_seconds_sum\{[^}]+\}\s+` + promFloatPattern) + matches := re.FindAllStringSubmatch(content, -1) + + var totalSum float64 + for _, match := range matches { + if len(match) >= 2 { + value, err := strconv.ParseFloat(match[1], 64) + if err != nil { + continue + } + totalSum += value + } + } + + return totalSum +} + // GetAllNodeMetrics fetches metrics from all data nodes concurrently. func GetAllNodeMetrics() []*NodeMetrics { var wg sync.WaitGroup @@ -381,24 +506,47 @@ func VerifyPropagationCountIncreased(beforeMetrics, afterMetrics []*NodeMetrics) // PrintMetricsComparison prints a comparison of metrics before and after. func PrintMetricsComparison(beforeMetrics, afterMetrics []*NodeMetrics) { fmt.Println("=== Prometheus Metrics Comparison ===") - fmt.Printf("%-12s | %-29s | %-29s | %-7s\n", "Node", "Propagation Count", "Repair Success Count", "Healthy") - fmt.Printf("%-12s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-7s\n", "", "Before", "After", "Delta", "Before", "After", "Delta", "") - fmt.Println(strings.Repeat("-", 85)) + fmt.Printf("%-12s | %-29s | %-29s | %-29s | %-29s | %-29s | %-12s | %-7s\n", + "Node", "Propagation Count", "Repair Success Count", "Repair Failure Count", + "Per-Property Timeout", "Coalesced (gossip)", "Succ Mean Lat", "Healthy") + fmt.Printf("%-12s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-12s | %-7s\n", + "", + "Before", "After", "Delta", + "Before", "After", "Delta", + "Before", "After", "Delta", + "Before", "After", "Delta", + "Before", "After", "Delta", + "ThisRound(sec)", + "") + fmt.Println(strings.Repeat("-", 190)) for i, after := range afterMetrics { if i < len(beforeMetrics) { before := beforeMetrics[i] propagationDelta := after.TotalPropagationCount - before.TotalPropagationCount repairDelta := after.RepairSuccessCount - before.RepairSuccessCount + failureDelta := after.RepairFailureCount - before.RepairFailureCount + timeoutDelta := after.RepairPerPropertyTimeoutCount - before.RepairPerPropertyTimeoutCount + coalescedDelta := after.TotalCoalesced - before.TotalCoalesced + meanLatency := 0.0 + countDelta := after.RepairSuccessLatencyCount - before.RepairSuccessLatencyCount + sumDelta := after.RepairSuccessLatencySumSeconds - before.RepairSuccessLatencySumSeconds + if countDelta > 0 { + meanLatency = sumDelta / float64(countDelta) + } healthStatus := "✓" if !after.IsHealthy { healthStatus = "✗" } - fmt.Printf("%-12s | %-9d %-9d %-9d | %-9d %-9d %-9d | %-7s\n", + fmt.Printf("%-12s | %-9d %-9d %-9d | %-9d %-9d %-9d | %-9d %-9d %-9d | %-9d %-9d %-9d | %-9d %-9d %-9d | %-12.4f | %-7s\n", after.NodeName, before.TotalPropagationCount, after.TotalPropagationCount, propagationDelta, before.RepairSuccessCount, after.RepairSuccessCount, repairDelta, + before.RepairFailureCount, after.RepairFailureCount, failureDelta, + before.RepairPerPropertyTimeoutCount, after.RepairPerPropertyTimeoutCount, timeoutDelta, + before.TotalCoalesced, after.TotalCoalesced, coalescedDelta, + meanLatency, healthStatus) } }
