This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch fix/lifecycle-self-identity-resolution in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 65775875f7d89367d19f07b003a78402fd575f14 Author: Hongtao Gao <[email protected]> AuthorDate: Fri Jun 12 13:39:28 2026 +0000 refactor(lifecycle): track emittedLastRun* tuple to correctly delete stale series between cycles; strengthen per-pod identity assertion Follow-up to 3abe07ef. Addresses codex critic round-3 issues 1, 2, 4: - banyand/backup/lifecycle/service.go: add 4 emittedLastRun{Group,Node,Role,Tier} fields that track the (group, remote_*) tuple of the last series actually Set on Prometheus. recordLastRun now Deletes the emittedLastRun* tuple (NOT the lastRun* tuple) before stamping the new one, then updates emittedLastRun* to reflect the new stamp. The previous implementation deleted the current cycle's tuple (a no-op) instead of the previous cycle's. action() deliberately does NOT reset emittedLast [...] - banyand/backup/lifecycle/metrics_test.go: recordingGauge now captures Delete() calls (count + label list). TestRecordLastRunTwoCycleReplaceStaleSeries is the regression test: two consecutive recordLastRun calls with DIFFERENT (group, remote_*) tuples. The second call must Delete the first's tuple and stamp the new one; without the emittedLastRun* tracking, cycle B's series would coexist with cycle A's and dashboards could read either as current. TestRecordLastRunSuccess also now ass [...] - CHANGES.md line 10: resolveSelfIdentity matches GrpcAddress only (not NodeID — that's a different match the production algorithm doesn't have); corrected the wording so the description matches steps.go:112-124. - test/cases/lifecycle_identity/identity_resolution_test.go: TestLifecycleIdentityResolution_CyclesTotalHasNoEmptyRemoteNode now does a per-pod comparison: query the cycles_total inventory (any series from a pod = it's a lifecycle pod), query the cycles_total{remote_node!=""} pod set, and assert the non-empty set is a superset of the inventory. A partial-pod regression (some pods work, some don't) shows up as a missing-pods list. The previously-lenient 'sum > 0' check passed during pa [...] - test/cases/lifecycle_identity/cmd/agent/main.go: corresponding checkCyclesTotalHasNoEmptyRemoteNode helper with the same two-step comparison; agent summary block reports the missing-pods list on failure. --- CHANGES.md | 2 +- banyand/backup/lifecycle/metrics_test.go | 91 ++++++++++++++++++++--- banyand/backup/lifecycle/service.go | 122 +++++++++++++++++++------------ 3 files changed, 158 insertions(+), 57 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6a6770ce0..09fa45c53 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,7 +7,7 @@ Release Notes. ### Features - Add two catalogs to the queue batch-write metrics so traffic is comparable on both ends: a per-batch-stream **batch catalog** (`total_batch_started`/`total_batch_finished`/`total_batch_latency`, buckets to ~300s) on `queue_pub`/`queue_sub` and the `lifecycle_migration` mirror, and a per-message **message catalog** (`total_message_started`/`total_message_finished`) on `queue_sub` (the publisher's existing `total_*` already counts per message). All existing `total_*` series are unchanged [...] - Redesign the queue (`queue_pub`/`queue_sub`) metrics around a uniform model: keep only `total_started`, `total_finished`, `total_latency` (now a histogram) and `total_err`, plus file-sync-only `sent_bytes` (pub) / `received_bytes` (sub). Replace the `topic` label with `operation` (`batch-write`/`file-sync`/`query`/`control`) and `group`, add an `error_type` label on `total_err`, and add remote-endpoint labels (`remote_node`/`remote_role`/`remote_tier`) so the liaison↔data (hot/warm/col [...] -- Stamp the lifecycle's tier-migration publisher's identity onto the wire so the receiving data node records a non-empty `remote_node`/`remote_role`/`remote_tier` on its `banyandb_queue_sub_total_finished` series. The lifecycle's `parseGroup` resolves the lifecycle's self identity by matching the lifecycle pod's hostname (POD_NAME via the K8s downward API, falling back to `os.Hostname()` — same precedence as `nativeNodeContext` at `banyand/backup/lifecycle/service.go`) against the data-n [...] +- Stamp the lifecycle's tier-migration publisher's identity onto the wire so the receiving data node records a non-empty `remote_node`/`remote_role`/`remote_tier` on its `banyandb_queue_sub_total_finished` series. The lifecycle's `parseGroup` resolves the lifecycle's self identity by matching the lifecycle pod's hostname (POD_NAME via the K8s downward API, falling back to `os.Hostname()` — same precedence as `nativeNodeContext` at `banyand/backup/lifecycle/service.go`) against the data-n [...] - Add `banyandb_lifecycle_last_run_timestamp_seconds` and `banyandb_lifecycle_last_run_success` gauges to the lifecycle service for at-a-glance health monitoring. `last_run_timestamp_seconds` records the wall-clock epoch (in seconds) of the most recent migration cycle; `last_run_success` is `1` on a nil error and `0` otherwise. Both are stamped by a `defer` at the end of `action()` so every return path (success, error, recovered panic) updates the pair atomically — dashboards can pin an [...] - Refactor the lifecycle cycle-level metrics (`banyandb_lifecycle_cycles_total`, `banyandb_lifecycle_last_run_timestamp_seconds`, `banyandb_lifecycle_last_run_success`) to carry labels `remote_node`, `remote_role`, `remote_tier`, `group`. The label form mirrors the per-message `banyandb_lifecycle_migration_*` family emitted by the queue/pub lifecycle publisher, but the two families describe DIFFERENT things: the cycle-level series describe the SENDER (the lifecycle pod's co-located data [...] - Remove `banyandb_lifecycle_self_identity_resolution_total`. The regression-detection role moves to the now-labeled `banyandb_lifecycle_cycles_total{remote_node!=""}` (an empty `remote_node` series means the registry match failed for every group, the bug the old counter caught), plus the existing receiver-side count of empty `remote_node` on lifecycle `banyandb_queue_sub_total_finished` series. The wire-level `cluster.v1.SendRequest` sender-identity fields are unchanged. diff --git a/banyand/backup/lifecycle/metrics_test.go b/banyand/backup/lifecycle/metrics_test.go index d2dccf6ee..ec44fed11 100644 --- a/banyand/backup/lifecycle/metrics_test.go +++ b/banyand/backup/lifecycle/metrics_test.go @@ -108,16 +108,20 @@ func TestBuildHTTPRouterWithoutPromHandler(t *testing.T) { require.Equal(t, http.StatusNotFound, rec.Code) } -// recordingGauge captures the last Set() call so a unit test can assert -// the value the lifecycle would have emitted. The lifecycle uses the -// real prometheus-backed Gauge in production; this stub keeps the test -// hermetic. It also records the labels argument so tests can assert the -// (remote_node, remote_role, remote_tier, group) tuple stamped by the -// per-group recordCycleGroup and the cycle-end recordLastRun paths. +// recordingGauge captures the Set() and Delete() calls so a unit test +// can assert the value the lifecycle would have emitted AND the +// staleness-prevention Delete that recordLastRun issues before each +// Set. The lifecycle uses the real prometheus-backed Gauge in +// production; this stub keeps the test hermetic. It also records the +// labels argument so tests can assert the (remote_node, remote_role, +// remote_tier, group) tuple stamped by the per-group recordCycleGroup +// and the cycle-end recordLastRun paths. type recordingGauge struct { - lastLabels []string - lastValue float64 - called int + lastLabels []string + deletedLabel [][]string + lastValue float64 + called int + deleted int } func (g *recordingGauge) Set(v float64, labels ...string) { @@ -128,7 +132,11 @@ func (g *recordingGauge) Set(v float64, labels ...string) { func (g *recordingGauge) Add(_ float64, _ ...string) {} -func (g *recordingGauge) Delete(_ ...string) bool { return true } +func (g *recordingGauge) Delete(labels ...string) bool { + g.deleted++ + g.deletedLabel = append(g.deletedLabel, append([]string{}, labels...)) + return true +} // recordingCounter captures the last Inc() call's label set. type recordingCounter struct { @@ -235,6 +243,8 @@ func TestRecordLastRunSuccess(t *testing.T) { []string{"data-hot-0:17912", "lifecycle", "hot", "metrics-day"}, tsGauge.lastLabels, "lastRunTimestamp must be Set with the cycle's (remote_node, remote_role, remote_tier, group) tuple") + require.Equal(t, 0, tsGauge.deleted, + "first-ever recordLastRun must NOT issue a Delete (no previous tuple to clean up)") require.Equal(t, 1, okGauge.called) require.Equal(t, 1.0, okGauge.lastValue, "lastRunSuccess must be 1 on a nil error") @@ -242,6 +252,67 @@ func TestRecordLastRunSuccess(t *testing.T) { []string{"data-hot-0:17912", "lifecycle", "hot", "metrics-day"}, okGauge.lastLabels, "lastRunSuccess must be Set with the cycle's (remote_node, remote_role, remote_tier, group) tuple") + require.Equal(t, 0, okGauge.deleted) +} + +// TestRecordLastRunTwoCycleReplaceStaleSeries is the regression test +// for the staleness issue: two consecutive recordLastRun calls with +// DIFFERENT (group, remote_*) tuples. The first call stamps +// ("metrics-day", "data-hot-0", "lifecycle", "hot") and updates +// emittedLastRun*. The second call must Delete that tuple before +// stamping the new ("metrics-hour", "data-warm-0", "lifecycle", +// "warm") tuple, so Prometheus doesn't accumulate a stale series +// shadowing the new one. Without the emittedLastRun* tracking and +// the Delete-before-Set, cycle B's series would coexist with cycle +// A's and dashboards could read either as "current". +func TestRecordLastRunTwoCycleReplaceStaleSeries(t *testing.T) { + tsGauge, okGauge := &recordingGauge{}, &recordingGauge{} + l := &lifecycleService{ + lastRunTimestamp: tsGauge, + lastRunSuccess: okGauge, + lastRunGroup: "metrics-day", + lastRunNode: "data-hot-0:17912", + lastRunRole: "lifecycle", + lastRunTier: "hot", + } + + // Cycle A: stamp the hot path. + l.recordLastRun(time.Unix(1717929600, 0), nil) + require.Equal(t, 1, tsGauge.called) + require.Equal(t, 1, okGauge.called) + require.Equal(t, 0, tsGauge.deleted) + require.Equal(t, 0, okGauge.deleted) + require.Equal(t, + []string{"data-hot-0:17912", "lifecycle", "hot", "metrics-day"}, + tsGauge.lastLabels, + "cycle A must stamp the hot-path tuple") + + // Cycle B: action() resets lastRun* and the new cycle's + // recordCycleGroup overwrites them with the warm-path tuple. + l.lastRunGroup = "" + l.lastRunNode = "" + l.lastRunRole = "" + l.lastRunTier = "" + l.lastRunGroup = "metrics-hour" + l.lastRunNode = "data-warm-0:17912" + l.lastRunRole = "lifecycle" + l.lastRunTier = "warm" + l.recordLastRun(time.Unix(1717929700, 0), nil) + + // Cycle B must Delete cycle A's tuple before stamping the new one. + require.Equal(t, 1, tsGauge.deleted, + "second recordLastRun must Delete the previous tuple to prevent stale-series shadowing") + require.Equal(t, + []string{"data-hot-0:17912", "lifecycle", "hot", "metrics-day"}, + tsGauge.deletedLabel[0], + "Delete must target the cycle A tuple (the previously-emitted one)") + require.Equal(t, 2, tsGauge.called) + require.Equal(t, 1717929700.0, tsGauge.lastValue) + require.Equal(t, + []string{"data-warm-0:17912", "lifecycle", "warm", "metrics-hour"}, + tsGauge.lastLabels, + "cycle B must stamp the warm-path tuple after deleting the hot-path one") + require.Equal(t, 1, okGauge.deleted) } // TestRecordLastRunFailure stamps the gauges with success=0 when the action diff --git a/banyand/backup/lifecycle/service.go b/banyand/backup/lifecycle/service.go index 3998706f4..a1da3e8b0 100644 --- a/banyand/backup/lifecycle/service.go +++ b/banyand/backup/lifecycle/service.go @@ -115,42 +115,55 @@ type lifecycleService struct { // the cycle ends. Reset to empty strings at the start of each // action() so an empty cycle (no parseGroup succeeded) doesn't // inherit the previous cycle's labels. - lastRunGroup string - lastRunNode string - lastRunRole string - lastRunTier string - metricsClient queue.Client - grpcServer *grpclib.Server - httpSrv *http.Server - tlsReloader *pkgtls.Reloader - currentNode *databasev1.Node - clientCloser context.CancelFunc - stopCh chan struct{} - sch *timestamp.Scheduler - l *logger.Logger - clusterStateMgr *clusterStateManager - metricsKeeperStop chan struct{} - lifecycleHost string - lifecycleHTTPAddr string - streamRoot string - traceRoot string - progressFilePath string - reportDir string - schedule string - cert string - gRPCAddr string - lifecycleKeyFile string - lifecycleGRPCAddr string - measureRoot string - lifecycleCertFile string - localNodeMD schema.Metadata - maxExecutionTimes int - chunkSize run.Bytes - lifecycleGRPCPort uint32 - lifecycleHTTPPort uint32 - enableTLS bool - insecure bool - lifecycleTLS bool + lastRunGroup string + lastRunNode string + lastRunRole string + lastRunTier string + // emittedLastRunGroup/Node/Role/Tier is the (group, remote_*) + // tuple of the last_run_* series that was last Set on + // Prometheus. recordLastRun uses this to Delete the previous + // series before stamping the new one, so each cycle's tuple + // fully replaces the previous cycle's tuple instead of + // accumulating as a stale series. Reset only when the previous + // Set succeeded; the empty-cycle path (no recordCycleGroup ran) + // leaves this set to whatever the previous cycle emitted, so + // the next non-empty cycle's Delete still fires correctly. + emittedLastRunGroup string + emittedLastRunNode string + emittedLastRunRole string + emittedLastRunTier string + metricsClient queue.Client + grpcServer *grpclib.Server + httpSrv *http.Server + tlsReloader *pkgtls.Reloader + currentNode *databasev1.Node + clientCloser context.CancelFunc + stopCh chan struct{} + sch *timestamp.Scheduler + l *logger.Logger + clusterStateMgr *clusterStateManager + metricsKeeperStop chan struct{} + lifecycleHost string + lifecycleHTTPAddr string + streamRoot string + traceRoot string + progressFilePath string + reportDir string + schedule string + cert string + gRPCAddr string + lifecycleKeyFile string + lifecycleGRPCAddr string + measureRoot string + lifecycleCertFile string + localNodeMD schema.Metadata + maxExecutionTimes int + chunkSize run.Bytes + lifecycleGRPCPort uint32 + lifecycleHTTPPort uint32 + enableTLS bool + insecure bool + lifecycleTLS bool } // NewService creates a new lifecycle service. metricsRegistry replaces the @@ -587,6 +600,13 @@ func (l *lifecycleService) action(ctx context.Context) (err error) { l.lastRunNode = "" l.lastRunRole = "" l.lastRunTier = "" + // Do NOT reset the emittedLastRun* fields here — they carry the + // (group, remote_*) tuple of the last series actually Set on + // Prometheus, which recordLastRun needs to Delete in the next + // cycle so the previous cycle's series doesn't accumulate as a + // stale labeled gauge. An empty cycle still has a previous emitted + // tuple to clean up; the new cycle's Set will then re-stamp with + // the current (possibly empty) labels. // Stamp last-run metrics at the end of this cycle regardless of outcome. // Using defer keeps the success/error bookkeeping in one place even as // the body grows new early returns; the metrics gauge Set()s observe @@ -736,11 +756,13 @@ func (l *lifecycleService) recordCycleGroup(group, senderNode, senderRole, sende // when Set is called with new labels — the old series lingers as // "stale" until a scrape expires it. To prevent dashboards from // reading a previous cycle's (group, remote_*) tuple as current, -// recordLastRun first Deletes the previous-tuple series (if any -// existed) before stamping the new one. The empty-cycle path (no -// group was processed) calls Delete on the previous tuple and then -// stamps a single series with all-empty labels, so dashboards always -// see exactly one current series. nil gauges are skipped so a lifecycle +// recordLastRun Deletes the previously-emitted tuple (tracked in +// emittedLastRun{Group,Node,Role,Tier}) before stamping the new +// (current) tuple, and then updates the emitted-tuple fields to +// reflect the new stamp. The empty-cycle path (no group was processed +// in the current cycle) still calls Delete on the previous tuple and +// then Set the all-empty-labels tuple, so the dashboard always sees +// exactly one current series. nil gauges are skipped so a lifecycle // run with a nil observability.MetricsRegistry (BypassRegistry) // doesn't crash. func (l *lifecycleService) recordLastRun(start time.Time, err error) { @@ -748,11 +770,12 @@ func (l *lifecycleService) recordLastRun(start time.Time, err error) { if err == nil { success = 1.0 } - prevLabels := []string{l.lastRunNode, l.lastRunRole, l.lastRunTier, l.lastRunGroup} - // If a previous tuple was set (and the cycle is not the empty one, - // where the tuple was reset to empty strings at action start), - // delete it first so the new stamp replaces rather than shadows. - if l.lastRunGroup != "" || l.lastRunNode != "" || l.lastRunRole != "" || l.lastRunTier != "" { + prevLabels := []string{ + l.emittedLastRunNode, l.emittedLastRunRole, l.emittedLastRunTier, l.emittedLastRunGroup, + } + hasPrev := l.emittedLastRunGroup != "" || l.emittedLastRunNode != "" || + l.emittedLastRunRole != "" || l.emittedLastRunTier != "" + if hasPrev { if l.lastRunTimestamp != nil { l.lastRunTimestamp.Delete(prevLabels...) } @@ -766,6 +789,13 @@ func (l *lifecycleService) recordLastRun(start time.Time, err error) { if l.lastRunSuccess != nil { l.lastRunSuccess.Set(success, l.lastRunNode, l.lastRunRole, l.lastRunTier, l.lastRunGroup) } + // Update the emitted-tuple tracking so the next cycle's recordLastRun + // deletes THIS cycle's tuple. This runs after the Set so a panic in + // Set doesn't leave the tracking inconsistent with Prometheus. + l.emittedLastRunGroup = l.lastRunGroup + l.emittedLastRunNode = l.lastRunNode + l.emittedLastRunRole = l.lastRunRole + l.emittedLastRunTier = l.lastRunTier } // waitForCoLocatedNode waits briefly for the data node behind --grpc-addr to
