This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new edb123974 feat(lifecycle): expose Prometheus metrics and the migration 
metric family (#1164)
edb123974 is described below

commit edb123974a7e45c1dc43f3b128ebcd8b1cad08c5
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Jun 9 09:31:51 2026 +0800

    feat(lifecycle): expose Prometheus metrics and the migration metric family 
(#1164)
    
    * feat(observability): support listener-suppressed metric service
    
    Add NewMetricServiceWithoutListener plus a listenerDisabled flag so an
    embedding service can reuse its own HTTP server for the Prometheus
    endpoint instead of opening the :2121 observability listener. Serve()
    skips the http.Server (releasing the closer slot) and exposes the
    registry through the new PrometheusHandlerProvider optional interface.
    Validate() still fails fast on an empty addr for the default
    constructor, so standalone/data/liaison stay unaffected.
    
    Signed-off-by: Hongtao Gao <[email protected]>
---
 banyand/backup/lifecycle/lifecycle.go              |  44 +++++-
 banyand/backup/lifecycle/metrics_test.go           | 107 +++++++++++++
 banyand/backup/lifecycle/service.go                | 173 +++++++++++++++++++--
 banyand/backup/lifecycle/steps.go                  |   4 +-
 banyand/backup/lifecycle/steps_test.go             |   2 +-
 banyand/observability/services/listener_test.go    |  97 ++++++++++++
 banyand/observability/services/service.go          |  77 ++++++++-
 banyand/observability/type.go                      |  11 ++
 banyand/queue/pub/batch.go                         |  35 +++++
 banyand/queue/pub/chunked_sync.go                  |  55 +++++--
 banyand/queue/pub/migration_metrics.go             |  51 ++++++
 banyand/queue/pub/migration_metrics_test.go        | 124 +++++++++++++++
 banyand/queue/pub/pub.go                           |  74 +++++----
 banyand/queue/pub/role_gate_test.go                |  45 ++++++
 banyand/queue/test/chunked_sync_common.go          |   2 +-
 fodc/agent/internal/flightrecorder/datasource.go   |  21 +++
 .../internal/flightrecorder/datasource_test.go     |  15 +-
 test/cases/lifecycle/lifecycle.go                  |  35 ++++-
 18 files changed, 891 insertions(+), 81 deletions(-)

diff --git a/banyand/backup/lifecycle/lifecycle.go 
b/banyand/backup/lifecycle/lifecycle.go
index 51c992138..c6f74dc37 100644
--- a/banyand/backup/lifecycle/lifecycle.go
+++ b/banyand/backup/lifecycle/lifecycle.go
@@ -24,9 +24,15 @@ import (
 
        "github.com/spf13/cobra"
 
+       "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/observability/services"
+       "github.com/apache/skywalking-banyandb/banyand/queue/pub"
        "github.com/apache/skywalking-banyandb/pkg/config"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/node"
        "github.com/apache/skywalking-banyandb/pkg/panicdiag"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/signal"
@@ -35,15 +41,39 @@ import (
 
 // NewCommand creates a new lifecycle command.
 func NewCommand() *cobra.Command {
+       cmd, _ := NewCommandWithRegistry()
+       return cmd
+}
+
+// NewCommandWithRegistry creates a new lifecycle command and also returns the
+// metrics registry it wires. Tests and embedders use the registry to inspect 
the
+// lifecycle and banyandb_lifecycle_migration_* metric families (e.g. via its
+// observability.PrometheusHandlerProvider) without scraping a live HTTP port.
+func NewCommandWithRegistry() (*cobra.Command, observability.MetricsRegistry) {
        logging := logger.Logging{}
        crashOutputCfg := panicdiag.NewCrashOutputConfig()
        metaSvc, err := metadata.NewClient()
        if err != nil {
                logger.GetLogger().Err(err).Msg("failed to initiate metadata 
service")
        }
-       svc := NewService(metaSvc)
+       // Native metrics pipeline: a queue client that publishes _monitoring 
measure
+       // writes to the co-located data node over gRPC. It is created idle 
here (no
+       // dial until a node is registered); the lifecycle service registers 
the local
+       // data node — with its --grpc-addr, known only after flag parsing — 
during its
+       // Serve phase when native mode is enabled.
+       // nil migration registry: this client carries native _monitoring 
writes, not
+       // tier-migration traffic, so it must not emit the 
banyandb_lifecycle_migration_* family.
+       metricsClient := pub.NewWithoutMetadata(nil)
+       nodeSelector, _ := node.NewPickFirstSelector()
+       nodeRegistry := grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, 
metricsClient, nodeSelector)
+       // Listener-suppressed: the lifecycle reuses its own HTTP port for 
/metrics
+       // instead of opening the observability :2121 listener.
+       metricSvc := services.NewMetricServiceWithoutListener(metaSvc, 
metricsClient, "lifecycle", nodeRegistry)
+       svc := NewService(metaSvc, metricSvc, metricsClient)
        group := run.NewGroup("lifecycle")
-       group.Register(new(signal.Handler), metaSvc, svc)
+       // metricSvc is registered before svc so its PreRun (building the 
providers)
+       // runs first; svc.PreRun then safely derives its proof counter from it.
+       group.Register(new(signal.Handler), metaSvc, metricSvc, svc)
        cmd := &cobra.Command{
                Short:             "Run lifecycle migration",
                DisableAutoGenTag: true,
@@ -64,7 +94,13 @@ func NewCommand() *cobra.Command {
                                        os.Exit(-1)
                                }
                        }()
-                       if err := group.Run(context.Background()); err != nil {
+                       runCtx := context.Background()
+                       if metricSvc.NativeEnabled() {
+                               // Native mode stamps the lifecycle node 
identity onto every
+                               // _monitoring series; the metric service reads 
it from the context.
+                               runCtx = nativeNodeContext(runCtx)
+                       }
+                       if err := group.Run(runCtx); err != nil {
                                
logger.GetLogger().Error().Err(err).Stack().Str("name", 
group.Name()).Msg("Exit")
                                os.Exit(-1)
                        }
@@ -75,5 +111,5 @@ func NewCommand() *cobra.Command {
        cmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the 
logging")
        cmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the 
root level of logging")
        crashOutputCfg.RegisterFlags(cmd.Flags())
-       return cmd
+       return cmd, metricSvc
 }
diff --git a/banyand/backup/lifecycle/metrics_test.go 
b/banyand/backup/lifecycle/metrics_test.go
new file mode 100644
index 000000000..5b764184c
--- /dev/null
+++ b/banyand/backup/lifecycle/metrics_test.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "context"
+       "net/http"
+       "net/http/httptest"
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+)
+
+// TestBuildLocalNodeMetadataHasDataRole guards the C1 failure mode: the
+// co-located data node MUST be registered with ROLE_DATA, or the pub client's
+// role gate silently drops it and native metrics never reach the data node.
+func TestBuildLocalNodeMetadataHasDataRole(t *testing.T) {
+       md := buildLocalNodeMD("127.0.0.1:17912")
+       node, ok := md.Spec.(*databasev1.Node)
+       require.True(t, ok)
+       require.Equal(t, metricsLocalNodeName, node.GetMetadata().GetName())
+       require.Equal(t, "127.0.0.1:17912", node.GetGrpcAddress())
+       require.Contains(t, node.GetRoles(), databasev1.Role_ROLE_DATA,
+               "local node must carry ROLE_DATA or the pub role gate silently 
drops it")
+}
+
+// TestNativeNodeContextSetsIdentity asserts the native node identity is 
injected
+// with a non-empty NodeID so per-pod _monitoring series stay distinct.
+func TestNativeNodeContextSetsIdentity(t *testing.T) {
+       ctx := nativeNodeContext(context.Background())
+       value := ctx.Value(common.ContextNodeKey)
+       require.NotNil(t, value)
+       n, ok := value.(common.Node)
+       require.True(t, ok)
+       require.NotEmpty(t, n.NodeID)
+}
+
+// stubPromRegistry is a MetricsRegistry that also exposes a Prometheus 
handler,
+// used to exercise buildHTTPRouter's /metrics mounting without a real 
registry.
+type stubPromRegistry struct {
+       observability.MetricsRegistry
+       handler http.Handler
+}
+
+func (s stubPromRegistry) PrometheusHandler() http.Handler { return s.handler }
+
+// TestBuildHTTPRouterServesMetricsAndAPI asserts /metrics is mounted on the 
same
+// router as /api (port reuse, requirement #1) without the two routes 
colliding.
+func TestBuildHTTPRouterServesMetricsAndAPI(t *testing.T) {
+       metricsHit, apiHit := false, false
+       l := &lifecycleService{
+               omr: stubPromRegistry{
+                       MetricsRegistry: observability.BypassRegistry,
+                       handler: http.HandlerFunc(func(w http.ResponseWriter, _ 
*http.Request) {
+                               metricsHit = true
+                               w.WriteHeader(http.StatusOK)
+                       }),
+               },
+       }
+       apiHandler := http.HandlerFunc(func(w http.ResponseWriter, _ 
*http.Request) {
+               apiHit = true
+               w.WriteHeader(http.StatusOK)
+       })
+       router := l.buildHTTPRouter(apiHandler)
+
+       metricsRec := httptest.NewRecorder()
+       router.ServeHTTP(metricsRec, httptest.NewRequest(http.MethodGet, 
"/metrics", nil))
+       require.Equal(t, http.StatusOK, metricsRec.Code)
+       require.True(t, metricsHit, "/metrics must be routed to the prometheus 
handler")
+
+       apiRec := httptest.NewRecorder()
+       router.ServeHTTP(apiRec, httptest.NewRequest(http.MethodGet, 
"/api/foo", nil))
+       require.Equal(t, http.StatusOK, apiRec.Code)
+       require.True(t, apiHit, "/api must still route to the gateway handler")
+}
+
+// TestBuildHTTPRouterWithoutPromHandler asserts that when the registry does 
not
+// expose a Prometheus handler (e.g. BypassRegistry), /metrics is simply absent
+// and /api keeps working.
+func TestBuildHTTPRouterWithoutPromHandler(t *testing.T) {
+       l := &lifecycleService{omr: observability.BypassRegistry}
+       router := l.buildHTTPRouter(http.HandlerFunc(func(w 
http.ResponseWriter, _ *http.Request) {
+               w.WriteHeader(http.StatusOK)
+       }))
+       rec := httptest.NewRecorder()
+       router.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", 
nil))
+       require.Equal(t, http.StatusNotFound, rec.Code)
+}
diff --git a/banyand/backup/lifecycle/service.go 
b/banyand/backup/lifecycle/service.go
index c98ec1cfa..84526a6e3 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -52,10 +52,13 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/healthcheck"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
@@ -69,22 +72,35 @@ type service interface {
 
 var _ service = (*lifecycleService)(nil)
 
+const (
+       // metricsLocalNodeName is the connMgr key for the co-located data node 
that
+       // receives the lifecycle's native _monitoring measure writes.
+       metricsLocalNodeName = "lifecycle-local"
+       // metricsNodeKeeperInterval is how often the keeper re-checks that the 
local
+       // data node connection is active and re-registers it if not.
+       metricsNodeKeeperInterval = 10 * time.Second
+)
+
 type lifecycleService struct {
        databasev1.UnimplementedClusterStateServiceServer
        databasev1.UnimplementedNodeQueryServiceServer
        metadata          metadata.Repo
        omr               observability.MetricsRegistry
        pm                protector.Memory
-       clusterStateMgr   *clusterStateManager
-       l                 *logger.Logger
-       sch               *timestamp.Scheduler
+       cyclesTotal       meter.Counter
+       metricsClient     queue.Client
        grpcServer        *grpclib.Server
        httpSrv           *http.Server
        tlsReloader       *pkgtls.Reloader
        currentNode       *databasev1.Node
        clientCloser      context.CancelFunc
        stopCh            chan struct{}
-       measureRoot       string
+       sch               *timestamp.Scheduler
+       l                 *logger.Logger
+       clusterStateMgr   *clusterStateManager
+       metricsKeeperStop chan struct{}
+       lifecycleHost     string
+       lifecycleHTTPAddr string
        streamRoot        string
        traceRoot         string
        progressFilePath  string
@@ -92,31 +108,62 @@ type lifecycleService struct {
        schedule          string
        cert              string
        gRPCAddr          string
-       lifecycleHost     string
+       lifecycleKeyFile  string
        lifecycleGRPCAddr string
-       lifecycleHTTPAddr string
+       measureRoot       string
        lifecycleCertFile string
-       lifecycleKeyFile  string
+       localNodeMD       schema.Metadata
+       maxExecutionTimes int
+       chunkSize         run.Bytes
        lifecycleGRPCPort uint32
        lifecycleHTTPPort uint32
-       maxExecutionTimes int
        enableTLS         bool
        insecure          bool
        lifecycleTLS      bool
-       chunkSize         run.Bytes
 }
 
-// NewService creates a new lifecycle service.
-func NewService(meta metadata.Repo) run.Unit {
+// NewService creates a new lifecycle service. metricsRegistry replaces the
+// previous BypassRegistry, so the protector memory metrics and the lifecycle
+// proof counter emit real series. metricsClient is the native-metrics 
pipeline to
+// the co-located data node; it is only exercised when native mode is enabled.
+func NewService(
+       meta metadata.Repo,
+       metricsRegistry observability.MetricsRegistry,
+       metricsClient queue.Client,
+) run.Unit {
        ls := &lifecycleService{
                metadata:        meta,
-               omr:             observability.BypassRegistry,
+               omr:             metricsRegistry,
+               metricsClient:   metricsClient,
                clusterStateMgr: &clusterStateManager{},
        }
        ls.pm = protector.NewMemory(ls.omr)
        return ls
 }
 
+// nativeNodeContext augments ctx with the lifecycle node identity that the 
metric
+// service's native mode stamps onto every _monitoring series. The identity is 
the
+// lifecycle's own (Type="lifecycle", set via the metric service nodeType); 
NodeID
+// prefers POD_NAME (downward API) and falls back to the hostname (the pod 
name in
+// k8s) so series from different pods stay distinct. The lifecycle's own 
gRPC/HTTP
+// addresses are not computed until Validate (later, and only when scheduled), 
so
+// the address tags are left empty.
+func nativeNodeContext(ctx context.Context) context.Context {
+       nodeID := os.Getenv("POD_NAME")
+       if nodeID == "" {
+               if hostname, hostErr := os.Hostname(); hostErr == nil {
+                       nodeID = hostname
+               }
+       }
+       if nodeID == "" {
+               nodeID = "lifecycle"
+       }
+       return context.WithValue(ctx, common.ContextNodeKey, common.Node{
+               NodeID: nodeID,
+               Labels: common.ParseNodeFlags(),
+       })
+}
+
 func (l *lifecycleService) FlagSet() *run.FlagSet {
        flagS := run.NewFlagSet(l.Name())
        flagS.StringSliceVar(&common.FlagNodeLabels, "node-labels", nil, "the 
node labels. e.g. key1=value1,key2=value2")
@@ -186,6 +233,10 @@ func (l *lifecycleService) Validate() error {
 func (l *lifecycleService) PreRun(_ context.Context) error {
        l.l = logger.GetLogger("lifecycle")
 
+       // Safe to call With() here: the metrics registry is registered earlier 
in the
+       // group, so its PreRun (which builds the provider) has already run.
+       l.cyclesTotal = 
l.omr.With(observability.RootScope.SubScope("lifecycle")).NewCounter("cycles_total")
+
        if l.schedule != "" && l.lifecycleTLS {
                var err error
                l.tlsReloader, err = pkgtls.NewReloader(l.lifecycleCertFile, 
l.lifecycleKeyFile, l.l)
@@ -202,6 +253,18 @@ func (l *lifecycleService) GracefulStop() {
                l.sch.Close()
        }
 
+       // Stop the native metrics node keeper, then close the local metrics pub
+       // client. The metric service may still attempt a final flush 
concurrently
+       // during shutdown; native metrics are best-effort, so a flush that 
loses the
+       // race to the closing client simply logs and is dropped.
+       if l.metricsKeeperStop != nil {
+               close(l.metricsKeeperStop)
+               l.metricsKeeperStop = nil
+       }
+       if l.metricsClient != nil {
+               l.metricsClient.GracefulStop()
+       }
+
        l.l.Info().Msg("Stopping lifecycle server")
 
        if l.tlsReloader != nil {
@@ -245,10 +308,81 @@ func (l *lifecycleService) Name() string {
        return "lifecycle"
 }
 
+// buildLocalNodeMD builds the schema metadata registered on the native metrics
+// pub client for the co-located data node. The ROLE_DATA role is REQUIRED: the
+// pub client's role gate (allowedRoles=[ROLE_DATA]) silently drops a node 
whose
+// roles do not intersect, which would make native metrics a silent no-op.
+func buildLocalNodeMD(grpcAddr string) schema.Metadata {
+       return schema.Metadata{
+               TypeMeta: schema.TypeMeta{Kind: schema.KindNode},
+               Spec: &databasev1.Node{
+                       Metadata:    &commonv1.Metadata{Name: 
metricsLocalNodeName},
+                       GrpcAddress: grpcAddr,
+                       Roles:       
[]databasev1.Role{databasev1.Role_ROLE_DATA},
+               },
+       }
+}
+
+// buildHTTPRouter assembles the lifecycle's HTTP routes: the gRPC-gateway API
+// under /api and, when the metrics registry exposes a Prometheus handler, the
+// /metrics endpoint on the same (existing) HTTP server.
+func (l *lifecycleService) buildHTTPRouter(apiHandler http.Handler) *chi.Mux {
+       mux := chi.NewRouter()
+       mux.Mount("/api", http.StripPrefix("/api", apiHandler))
+       if reg, ok := l.omr.(observability.PrometheusHandlerProvider); ok {
+               mux.Handle("/metrics", reg.PrometheusHandler())
+       }
+       return mux
+}
+
+// startMetricsNodeKeeper registers the co-located data node on the native
+// metrics pub client and keeps the connection active. The pub connManager 
runs a
+// synchronous health check at registration and does NOT auto-retry an evicted
+// node (NewWithoutMetadata leaves healthCheckInterval==0), so the keeper
+// re-registers whenever the node is not locatable — covering both an initial
+// dial that failed (data node not ready yet) and a later failover eviction.
+func (l *lifecycleService) startMetricsNodeKeeper() {
+       l.localNodeMD = buildLocalNodeMD(l.gRPCAddr)
+       l.metricsKeeperStop = make(chan struct{})
+       // Initial registration dials the local data node so native flushes 
route there.
+       l.metricsClient.OnAddOrUpdate(l.localNodeMD)
+       run.Go(context.Background(), "backup.lifecycle.metrics-node-keeper", 
l.l, func(_ context.Context) {
+               ticker := time.NewTicker(metricsNodeKeeperInterval)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-l.metricsKeeperStop:
+                               return
+                       case <-ticker.C:
+                               // HealthyNodes reflects the pub's 
active-connection state directly.
+                               // connMgr.OnAddOrUpdate fans out OnActive 
synchronously (which seeds
+                               // the native selector), so an empty set means 
the local data node is
+                               // not connected and native flushes would drop; 
force a fresh dial.
+                               if len(l.metricsClient.HealthyNodes()) == 0 {
+                                       // connMgr.OnAddOrUpdate short-circuits 
an evictable node, so drop
+                                       // it first to force a fresh dial. 
queue.Client does not expose
+                                       // OnDelete, hence the type assertion.
+                                       if deleter, ok := 
l.metricsClient.(interface{ OnDelete(schema.Metadata) }); ok {
+                                               deleter.OnDelete(l.localNodeMD)
+                                       }
+                                       
l.metricsClient.OnAddOrUpdate(l.localNodeMD)
+                               }
+                       }
+               }
+       })
+}
+
 func (l *lifecycleService) Serve() run.StopNotify {
        l.l = logger.GetLogger("lifecycle")
        l.stopCh = make(chan struct{})
 
+       // Register the co-located data node for native metrics and keep it 
active.
+       // Native works in both scheduled and one-shot modes (it is a gRPC 
write that
+       // does not depend on the lifecycle's own HTTP server).
+       if l.omr.NativeEnabled() {
+               l.startMetricsNodeKeeper()
+       }
+
        // Start gRPC/HTTP servers when schedule is set
        if l.schedule != "" {
                l.startServers()
@@ -352,8 +486,10 @@ func (l *lifecycleService) startServers() {
                return
        }
 
-       mux := chi.NewRouter()
-       mux.Mount("/api", http.StripPrefix("/api", gwMux))
+       // Reuse this existing HTTP server for the Prometheus endpoint instead 
of
+       // opening a separate observability listener. The fodc-agent already 
scrapes
+       // this port, so the lifecycle metrics appear with zero deployment 
change.
+       mux := l.buildHTTPRouter(gwMux)
 
        l.httpSrv = &http.Server{
                Addr:              l.lifecycleHTTPAddr,
@@ -403,6 +539,9 @@ func (l *lifecycleService) startServers() {
 }
 
 func (l *lifecycleService) action(ctx context.Context) error {
+       if l.cyclesTotal != nil {
+               l.cyclesTotal.Inc(1)
+       }
        progress := LoadProgress(l.progressFilePath, l.l)
        progress.ClearErrors()
 
@@ -917,7 +1056,7 @@ func (l *lifecycleService) getGroupsToProcess(ctx 
context.Context, progress *Pro
 func (l *lifecycleService) processStreamGroup(ctx context.Context, g 
*commonv1.Group,
        streamDir string, nodes []*databasev1.Node, labels map[string]string, 
progress *Progress,
 ) {
-       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr, l.omr)
        if err != nil {
                l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
                return
@@ -1037,7 +1176,7 @@ func (l *lifecycleService) 
deleteExpiredStreamSegments(ctx context.Context, g *c
 func (l *lifecycleService) processMeasureGroup(ctx context.Context, g 
*commonv1.Group, measureDir string,
        nodes []*databasev1.Node, labels map[string]string, progress *Progress,
 ) {
-       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr, l.omr)
        if err != nil {
                l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
                return
@@ -1144,7 +1283,7 @@ func (l *lifecycleService) deleteExpiredTraceSegments(ctx 
context.Context, g *co
 func (l *lifecycleService) processTraceGroup(ctx context.Context, g 
*commonv1.Group, traceDir string,
        nodes []*databasev1.Node, labels map[string]string, progress *Progress,
 ) {
-       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr, l.omr)
        if err != nil {
                l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
                return
diff --git a/banyand/backup/lifecycle/steps.go 
b/banyand/backup/lifecycle/steps.go
index b4270cc70..66c1ef04e 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -32,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/queue/pub"
        "github.com/apache/skywalking-banyandb/pkg/fs"
@@ -123,6 +124,7 @@ func cloneIntervalRule(ir *commonv1.IntervalRule) 
*commonv1.IntervalRule {
 func parseGroup(
        g *commonv1.Group, nodeLabels map[string]string, nodes 
[]*databasev1.Node,
        l *logger.Logger, metadata metadata.Repo, clusterStateMgr 
*clusterStateManager,
+       omr observability.MetricsRegistry,
 ) (*GroupConfig, error) {
        ro := g.ResourceOpts
        if ro == nil {
@@ -190,7 +192,7 @@ func parseGroup(
        if ok, _ := nodeSel.OnInit([]schema.Kind{schema.KindGroup}); !ok {
                return nil, fmt.Errorf("failed to initialize node selector for 
group %s", g.Metadata.Name)
        }
-       client := pub.NewWithoutMetadata() //nolint:contextcheck // health 
check goroutine uses context.Background()
+       client := pub.NewWithoutMetadata(omr) //nolint:contextcheck // health 
check goroutine uses context.Background()
        switch g.Catalog {
        case commonv1.Catalog_CATALOG_STREAM:
                _ = grpc.NewClusterNodeRegistry(data.TopicStreamWrite, client, 
nodeSel)
diff --git a/banyand/backup/lifecycle/steps_test.go 
b/banyand/backup/lifecycle/steps_test.go
index c1728b5f2..bc14ab353 100644
--- a/banyand/backup/lifecycle/steps_test.go
+++ b/banyand/backup/lifecycle/steps_test.go
@@ -85,7 +85,7 @@ func TestParseGroup_RejectsMissingIntervals(t *testing.T) {
        for _, c := range cases {
                t.Run(c.name, func(t *testing.T) {
                        g := makeGroup(c.mutate)
-                       _, err := parseGroup(g, map[string]string{"type": 
"warm"}, nil, nil, nil, nil)
+                       _, err := parseGroup(g, map[string]string{"type": 
"warm"}, nil, nil, nil, nil, nil)
                        require.Error(t, err)
                        assert.Contains(t, err.Error(), c.errFrag)
                })
diff --git a/banyand/observability/services/listener_test.go 
b/banyand/observability/services/listener_test.go
new file mode 100644
index 000000000..3824694be
--- /dev/null
+++ b/banyand/observability/services/listener_test.go
@@ -0,0 +1,97 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package services
+
+import (
+       "context"
+       "net/http"
+       "net/http/httptest"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+)
+
+func newTestPromService(t *testing.T, listenerDisabled bool) *metricService {
+       t.Helper()
+       var reg observability.MetricsRegistry
+       if listenerDisabled {
+               reg = NewMetricServiceWithoutListener(nil, nil, "test", nil)
+       } else {
+               reg = NewMetricService(nil, nil, "test", nil)
+       }
+       svc, ok := reg.(*metricService)
+       require.True(t, ok)
+       // FlagSet is not parsed in unit scope, so set the fields the 
validators and
+       // Serve rely on directly.
+       svc.modes = []string{flagPromethusMode}
+       svc.metricsInterval = time.Second
+       svc.nativeFlushInterval = time.Second
+       return svc
+}
+
+// TestMetricServiceValidateFailFast asserts the default constructor still 
rejects
+// an empty listener address (M2: fail-fast preserved for 
standalone/data/liaison),
+// while the listener-disabled constructor permits it.
+func TestMetricServiceValidateFailFast(t *testing.T) {
+       normal := newTestPromService(t, false)
+       require.ErrorIs(t, normal.Validate(), errNoAddr)
+
+       disabled := newTestPromService(t, true)
+       require.NoError(t, disabled.Validate(), "listener-disabled service must 
not require an addr")
+}
+
+// TestPrometheusHandlerServesRegisteredSeries asserts the exposed handler 
serves a
+// counter registered through With(scope) - the path the lifecycle uses for its
+// banyandb_lifecycle_cycles_total proof series.
+func TestPrometheusHandlerServesRegisteredSeries(t *testing.T) {
+       svc := newTestPromService(t, true)
+       require.NoError(t, svc.PreRun(context.Background()))
+
+       counter := 
svc.With(observability.RootScope.SubScope("lifecycle")).NewCounter("cycles_total")
+       counter.Inc(1)
+
+       rec := httptest.NewRecorder()
+       svc.PrometheusHandler().ServeHTTP(rec, 
httptest.NewRequest(http.MethodGet, "/metrics", nil))
+       require.Equal(t, http.StatusOK, rec.Code)
+       require.Contains(t, rec.Body.String(), 
"banyandb_lifecycle_cycles_total")
+}
+
+// TestListenerDisabledServeBindsNoSocket asserts the listener-disabled service
+// neither creates its own http.Server nor blocks on GracefulStop (the closer 
slot
+// must be released even though the listener goroutine never starts).
+func TestListenerDisabledServeBindsNoSocket(t *testing.T) {
+       svc := newTestPromService(t, true)
+       require.NoError(t, svc.PreRun(context.Background()))
+
+       require.NotNil(t, svc.Serve())
+       require.Nil(t, svc.svr, "listener-disabled service must not create its 
own http.Server")
+
+       done := make(chan struct{})
+       go func() {
+               svc.GracefulStop()
+               close(done)
+       }()
+       select {
+       case <-done:
+       case <-time.After(5 * time.Second):
+               t.Fatal("GracefulStop blocked for a listener-disabled service")
+       }
+}
diff --git a/banyand/observability/services/service.go 
b/banyand/observability/services/service.go
index b87dffd5f..f051bfd7d 100644
--- a/banyand/observability/services/service.go
+++ b/banyand/observability/services/service.go
@@ -27,6 +27,7 @@ import (
 
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/collectors"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
        "github.com/robfig/cron/v3"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -55,8 +56,9 @@ var (
 )
 
 var (
-       _ run.Service = (*metricService)(nil)
-       _ run.Config  = (*metricService)(nil)
+       _ run.Service                             = (*metricService)(nil)
+       _ run.Config                              = (*metricService)(nil)
+       _ observability.PrometheusHandlerProvider = (*metricService)(nil)
 
        obScope = observability.RootScope.SubScope("observability")
 )
@@ -78,6 +80,19 @@ func NewMetricService(metadata metadata.Repo, pipeline 
queue.Client, nodeType st
        }
 }
 
+// NewMetricServiceWithoutListener returns a metric service that does not start
+// its own HTTP listener. The embedding service is responsible for serving the
+// metrics, e.g. by mounting PrometheusHandler on its own router. This lets the
+// lifecycle command expose /metrics on its existing HTTP port instead of 
opening
+// a separate observability listener.
+func NewMetricServiceWithoutListener(metadata metadata.Repo, pipeline 
queue.Client, nodeType string, nodeSelector native.NodeSelector) 
observability.MetricsRegistry {
+       // Direct assertion: NewMetricService always returns *metricService, so 
a
+       // mismatch should fail loudly here rather than nil-deref on the next 
line.
+       svc := NewMetricService(metadata, pipeline, nodeType, 
nodeSelector).(*metricService)
+       svc.listenerDisabled = true
+       return svc
+}
+
 type metricService struct {
        metadata            metadata.Repo
        nodeSelector        native.NodeSelector
@@ -97,6 +112,7 @@ type metricService struct {
        metricsInterval     time.Duration
        nativeFlushInterval time.Duration
        panicMaxArtifacts   int
+       listenerDisabled    bool
        mutex               sync.Mutex
 }
 
@@ -114,7 +130,7 @@ func (p *metricService) FlagSet() *run.FlagSet {
 }
 
 func (p *metricService) Validate() error {
-       if p.listenAddr == "" {
+       if !p.listenerDisabled && p.listenAddr == "" {
                return errNoAddr
        }
        if len(p.modes) == 0 {
@@ -192,6 +208,14 @@ func (p *metricService) Name() string {
 func (p *metricService) Serve() run.StopNotify {
        p.mutex.Lock()
        defer p.mutex.Unlock()
+       if p.listenerDisabled {
+               // A listener-suppressed service shares its process with a 
node's metric
+               // service (e.g. the lifecycle sidecar alongside a data node). 
The host
+               // gauges and the MetricsCollector singleton are 
process-global, so it must
+               // not (re)initialize or drive them — that races the node's 
running
+               // collector. It only flushes its own lifecycle/migration 
families.
+               return p.serveWithoutListener()
+       }
        p.initMetrics()
        startupCtx, cancelStartup := context.WithTimeout(context.Background(), 
30*time.Second)
        defer cancelStartup()
@@ -248,6 +272,37 @@ func (p *metricService) Serve() run.StopNotify {
        return p.closer.CloseNotify()
 }
 
+// serveWithoutListener brings up the metric service without its own HTTP 
listener
+// and without touching the process-global host gauges (cpu/mem/net/disk) or 
the
+// shared MetricsCollector singleton. The embedding service (e.g. the lifecycle
+// command) serves /metrics via PrometheusHandler; this path only drives
+// native-mode flushing of the lifecycle and migration metric families. 
Skipping
+// the global host collectors avoids racing a co-located node's metric service 
in
+// single-process deployments.
+func (p *metricService) serveWithoutListener() run.StopNotify {
+       startupCtx, cancelStartup := context.WithTimeout(context.Background(), 
30*time.Second)
+       defer cancelStartup()
+       if containsMode(p.modes, flagNativeMode) {
+               p.npf.setServeStarted()
+               p.npf.initAllSchemas(startupCtx)
+               p.nCollection.FlushMetrics(startupCtx)
+               clock, _ := timestamp.GetClock(context.TODO())
+               p.scheduler = timestamp.NewScheduler(p.l, clock)
+               nativeFlushExpr := fmt.Sprintf("@every %s", 
p.nativeFlushInterval)
+               if err := p.scheduler.Register(startupCtx, 
"native-metric-collection", cron.Descriptor, nativeFlushExpr,
+                       func(ctx context.Context, _ time.Time, _ 
*logger.Logger) bool {
+                               p.nCollection.FlushMetrics(ctx)
+                               return true
+                       }); err != nil {
+                       p.l.Fatal().Err(err).Msg("Failed to register native 
metric collection")
+               }
+       }
+       // Release the closer slot so GracefulStop's CloseThenWait does not 
block.
+       p.l.Info().Msg("metric server listener disabled; metrics served by the 
embedding service")
+       p.closer.Done()
+       return p.closer.CloseNotify()
+}
+
 func (p *metricService) GracefulStop() {
        p.mutex.Lock()
        defer p.mutex.Unlock()
@@ -266,10 +321,26 @@ func (p *metricService) NativeEnabled() bool {
 
 func (p *metricService) routeTableHandler(w http.ResponseWriter, _ 
*http.Request) {
        w.Header().Set("Content-Type", "application/json")
+       if p.nodeSelector == nil {
+               w.WriteHeader(http.StatusNotFound)
+               return
+       }
        w.WriteHeader(http.StatusOK)
        _, _ = w.Write([]byte(p.nodeSelector.String()))
 }
 
+// PrometheusHandler returns an http.Handler exposing this service's Prometheus
+// registry, for mounting on an external router (see 
PrometheusHandlerProvider).
+// It returns a 404 handler when prometheus mode is not enabled.
+func (p *metricService) PrometheusHandler() http.Handler {
+       if p.promReg == nil {
+               return http.HandlerFunc(func(w http.ResponseWriter, _ 
*http.Request) {
+                       http.Error(w, "prometheus mode not enabled", 
http.StatusNotFound)
+               })
+       }
+       return promhttp.HandlerFor(p.promReg, promhttp.HandlerOpts{})
+}
+
 func containsMode(modes []string, mode string) bool {
        for _, item := range modes {
                if item == mode {
diff --git a/banyand/observability/type.go b/banyand/observability/type.go
index 85b07f89c..49ca88256 100644
--- a/banyand/observability/type.go
+++ b/banyand/observability/type.go
@@ -19,6 +19,8 @@
 package observability
 
 import (
+       "net/http"
+
        "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -46,3 +48,12 @@ type MetricsRegistry interface {
        // NativeEnabled returns whether the native mode is enabled.
        NativeEnabled() bool
 }
+
+// PrometheusHandlerProvider is an optional interface implemented by a
+// MetricsRegistry that can expose its Prometheus exposition handler for 
mounting
+// on an external HTTP router, instead of serving it on the registry's own
+// listener. The lifecycle command uses this to serve /metrics on its existing
+// HTTP port rather than opening a separate observability listener.
+type PrometheusHandlerProvider interface {
+       PrometheusHandler() http.Handler
+}
diff --git a/banyand/queue/pub/batch.go b/banyand/queue/pub/batch.go
index 7ca830b92..47eaebfda 100644
--- a/banyand/queue/pub/batch.go
+++ b/banyand/queue/pub/batch.go
@@ -209,6 +209,10 @@ func (bp *batchPublisher) hasMetrics() bool {
        return bp.pub != nil && bp.pub.metrics != nil
 }
 
+func (bp *batchPublisher) hasMigrationMetrics() bool {
+       return bp.pub != nil && bp.pub.migrationMetrics != nil
+}
+
 // listenBatchResponse receives the server response and records failover 
events and end-to-end failure metrics.
 func (bp *batchPublisher) listenBatchResponse(ctx context.Context, s 
clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent, curNode 
string) {
        defer func() {
@@ -236,6 +240,9 @@ func (bp *batchPublisher) listenBatchResponse(ctx 
context.Context, s clusterv1.S
                if bp.hasMetrics() {
                        bp.pub.metrics.totalErr.Inc(1, operation, "", curNode, 
info.role, info.tier, sendErrReasonRecvError)
                }
+               if bp.hasMigrationMetrics() {
+                       bp.pub.migrationMetrics.totalErr.Inc(1, operation, "", 
curNode, info.role, info.tier, sendErrReasonRecvError)
+               }
                if grpchelper.IsFailoverError(errRecv) {
                        // Record circuit breaker failure before creating 
failover event
                        bp.pub.connMgr.RecordFailure(curNode, errRecv)
@@ -252,6 +259,9 @@ func (bp *batchPublisher) listenBatchResponse(ctx 
context.Context, s clusterv1.S
        if bp.hasMetrics() {
                bp.pub.metrics.totalErr.Inc(1, operation, "", curNode, 
info.role, info.tier, sendErrReasonServerRejected)
        }
+       if bp.hasMigrationMetrics() {
+               bp.pub.migrationMetrics.totalErr.Inc(1, operation, "", curNode, 
info.role, info.tier, sendErrReasonServerRejected)
+       }
        ce := common.NewErrorWithStatus(resp.Status, resp.Error)
        // Only failover statuses trigger circuit-breaker accounting; other 
server-side
        // rejections (e.g. invalid argument) are surfaced to the caller but do 
not count
@@ -366,6 +376,9 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                if bp.hasMetrics() {
                        
bp.pub.metrics.totalLatency.Observe(time.Since(start).Seconds(), operation, 
group, node, info.role, info.tier)
                }
+               if bp.hasMigrationMetrics() {
+                       
bp.pub.migrationMetrics.totalLatency.Observe(time.Since(start).Seconds(), 
operation, group, node, info.role, info.tier)
+               }
        }
 
        for attempt := 0; attempt <= defaultMaxRetries; attempt++ {
@@ -379,6 +392,9 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                        if bp.hasMetrics() {
                                bp.pub.metrics.totalErr.Inc(1, operation, 
group, node, info.role, info.tier, sendErrReasonCanceled)
                        }
+                       if bp.hasMigrationMetrics() {
+                               bp.pub.migrationMetrics.totalErr.Inc(1, 
operation, group, node, info.role, info.tier, sendErrReasonCanceled)
+                       }
                        observeLatency()
                        return ctx.Err()
                case <-stream.Context().Done():
@@ -386,6 +402,9 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                        if bp.hasMetrics() {
                                bp.pub.metrics.totalErr.Inc(1, operation, 
group, node, info.role, info.tier, sendErrReasonStreamCanceled)
                        }
+                       if bp.hasMigrationMetrics() {
+                               bp.pub.migrationMetrics.totalErr.Inc(1, 
operation, group, node, info.role, info.tier, sendErrReasonStreamCanceled)
+                       }
                        observeLatency()
                        return stream.Context().Err()
                case <-attemptCtx.Done():
@@ -407,6 +426,10 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                                bp.pub.metrics.totalStarted.Inc(1, operation, 
group, node, info.role, info.tier)
                                bp.pub.metrics.totalFinished.Inc(1, operation, 
group, node, info.role, info.tier)
                        }
+                       if bp.hasMigrationMetrics() {
+                               bp.pub.migrationMetrics.totalStarted.Inc(1, 
operation, group, node, info.role, info.tier)
+                               bp.pub.migrationMetrics.totalFinished.Inc(1, 
operation, group, node, info.role, info.tier)
+                       }
                        // Success writing to the local stream; end-to-end ack 
is observed in listenBatchResponse.
                        observeLatency()
                        return nil
@@ -420,6 +443,9 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                        if bp.hasMetrics() {
                                bp.pub.metrics.totalErr.Inc(1, operation, 
group, node, info.role, info.tier, sendErrReasonNonTransient)
                        }
+                       if bp.hasMigrationMetrics() {
+                               bp.pub.migrationMetrics.totalErr.Inc(1, 
operation, group, node, info.role, info.tier, sendErrReasonNonTransient)
+                       }
                        observeLatency()
                        return sendErr
                }
@@ -440,12 +466,18 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                        if bp.hasMetrics() {
                                bp.pub.metrics.totalErr.Inc(1, operation, 
group, node, info.role, info.tier, sendErrReasonCanceled)
                        }
+                       if bp.hasMigrationMetrics() {
+                               bp.pub.migrationMetrics.totalErr.Inc(1, 
operation, group, node, info.role, info.tier, sendErrReasonCanceled)
+                       }
                        observeLatency()
                        return ctx.Err()
                case <-stream.Context().Done():
                        if bp.hasMetrics() {
                                bp.pub.metrics.totalErr.Inc(1, operation, 
group, node, info.role, info.tier, sendErrReasonStreamCanceled)
                        }
+                       if bp.hasMigrationMetrics() {
+                               bp.pub.migrationMetrics.totalErr.Inc(1, 
operation, group, node, info.role, info.tier, sendErrReasonStreamCanceled)
+                       }
                        observeLatency()
                        return stream.Context().Err()
                }
@@ -455,6 +487,9 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
        if bp.hasMetrics() {
                bp.pub.metrics.totalErr.Inc(1, operation, group, node, 
info.role, info.tier, sendErrReasonRetryExhausted)
        }
+       if bp.hasMigrationMetrics() {
+               bp.pub.migrationMetrics.totalErr.Inc(1, operation, group, node, 
info.role, info.tier, sendErrReasonRetryExhausted)
+       }
        observeLatency()
        return fmt.Errorf("retry exhausted for node %s after %d attempts, last 
error: %w", node, defaultMaxRetries+1, lastErr)
 }
diff --git a/banyand/queue/pub/chunked_sync.go 
b/banyand/queue/pub/chunked_sync.go
index f0da29073..6c30fef0b 100644
--- a/banyand/queue/pub/chunked_sync.go
+++ b/banyand/queue/pub/chunked_sync.go
@@ -44,18 +44,43 @@ const (
 )
 
 type chunkedSyncClient struct {
-       client     clusterv1.ServiceClient
-       conn       *grpc.ClientConn
-       log        *logger.Logger
-       metrics    *pubMetrics
-       config     *ChunkedSyncClientConfig
-       selfNode   string
-       selfRole   string
-       selfTier   string
-       node       string
-       remoteRole string
-       remoteTier string
-       chunkSize  uint32
+       client           clusterv1.ServiceClient
+       conn             *grpc.ClientConn
+       log              *logger.Logger
+       metrics          *pubMetrics
+       migrationMetrics *pubMigrationMetrics
+       config           *ChunkedSyncClientConfig
+       selfNode         string
+       selfRole         string
+       selfTier         string
+       node             string
+       remoteRole       string
+       remoteTier       string
+       chunkSize        uint32
+}
+
+// migStarted, migErr and migFinished emit the banyandb_lifecycle_migration_*
+// mirror of the file-sync metrics. They are extracted from SyncStreamingParts 
so
+// the parallel guards do not push that function over the gocyclo budget; the
+// banyandb_queue_pub_* emissions stay inline and untouched.
+func (c *chunkedSyncClient) migStarted(operation, group string) {
+       if c.migrationMetrics != nil {
+               c.migrationMetrics.totalStarted.Inc(1, operation, group, 
c.node, c.remoteRole, c.remoteTier)
+       }
+}
+
+func (c *chunkedSyncClient) migErr(operation, group, errType string) {
+       if c.migrationMetrics != nil {
+               c.migrationMetrics.totalErr.Inc(1, operation, group, c.node, 
c.remoteRole, c.remoteTier, errType)
+       }
+}
+
+func (c *chunkedSyncClient) migFinished(operation, group string, duration 
time.Duration, bytesSent uint64) {
+       if c.migrationMetrics != nil {
+               c.migrationMetrics.totalFinished.Inc(1, operation, group, 
c.node, c.remoteRole, c.remoteTier)
+               c.migrationMetrics.totalLatency.Observe(duration.Seconds(), 
operation, group, c.node, c.remoteRole, c.remoteTier)
+               c.migrationMetrics.sentBytes.Inc(float64(bytesSent), operation, 
group, c.node, c.remoteRole, c.remoteTier)
+       }
 }
 
 // SyncStreamingParts implements queue.ChunkedSyncClient with streaming 
support.
@@ -114,6 +139,7 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
        if c.metrics != nil {
                c.metrics.totalStarted.Inc(1, operation, group, c.node, 
c.remoteRole, c.remoteTier)
        }
+       c.migStarted(operation, group)
 
        metadata := &clusterv1.SyncMetadata{
                Group:      group,
@@ -134,6 +160,7 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
                if c.metrics != nil {
                        c.metrics.totalErr.Inc(1, operation, group, c.node, 
c.remoteRole, c.remoteTier, errType)
                }
+               c.migErr(operation, group, errType)
                return nil, fmt.Errorf("failed to stream parts: %w", streamErr)
        }
        if totalChunks == 0 && len(failedParts) == 0 {
@@ -143,6 +170,7 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
                        c.metrics.totalLatency.Observe(duration.Seconds(), 
operation, group, c.node, c.remoteRole, c.remoteTier)
                        c.metrics.sentBytes.Inc(float64(totalBytesSent), 
operation, group, c.node, c.remoteRole, c.remoteTier)
                }
+               c.migFinished(operation, group, duration, totalBytesSent)
                return &queue.SyncResult{
                        Success:    true,
                        SessionID:  sessionID,
@@ -160,6 +188,7 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
                        if c.metrics != nil {
                                c.metrics.totalErr.Inc(1, operation, group, 
c.node, c.remoteRole, c.remoteTier, "recv_error")
                        }
+                       c.migErr(operation, group, "recv_error")
                        return nil, fmt.Errorf("failed to receive final 
response: %w", recvErr)
                }
                finalResp = resp
@@ -185,10 +214,12 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
                        c.metrics.totalLatency.Observe(duration.Seconds(), 
operation, group, c.node, c.remoteRole, c.remoteTier)
                        c.metrics.sentBytes.Inc(float64(totalBytesSent), 
operation, group, c.node, c.remoteRole, c.remoteTier)
                }
+               c.migFinished(operation, group, duration, totalBytesSent)
        } else {
                if c.metrics != nil {
                        c.metrics.totalErr.Inc(1, operation, group, c.node, 
c.remoteRole, c.remoteTier, "completion_error")
                }
+               c.migErr(operation, group, "completion_error")
        }
 
        return &queue.SyncResult{
diff --git a/banyand/queue/pub/migration_metrics.go 
b/banyand/queue/pub/migration_metrics.go
new file mode 100644
index 000000000..0be8e4320
--- /dev/null
+++ b/banyand/queue/pub/migration_metrics.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+// lifecycleMigrationScope produces the banyandb_lifecycle_migration_* family: 
the
+// tier-migration mirror of the banyandb_queue_pub_* family (see 
queuePubScope).
+var lifecycleMigrationScope = 
observability.RootScope.SubScope("lifecycle_migration")
+
+// pubMigrationMetrics mirrors pubMetrics for the lifecycle tier-migration
+// publisher. That publisher is built via NewWithoutMetadata (no metadata), so 
the
+// regular pubMetrics stay nil and emit nothing; this parallel family is 
registered
+// from an explicitly supplied MetricsRegistry instead, on its own scope so the
+// migration layer can be queried independently of the write-pipeline pub 
family.
+type pubMigrationMetrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalLatency  meter.Histogram
+       totalErr      meter.Counter
+       sentBytes     meter.Counter
+}
+
+func newPubMigrationMetrics(factory observability.Factory) 
*pubMigrationMetrics {
+       labels := []string{"operation", "group", "remote_node", "remote_role", 
"remote_tier"}
+       return &pubMigrationMetrics{
+               totalStarted:  factory.NewCounter("total_started", labels...),
+               totalFinished: factory.NewCounter("total_finished", labels...),
+               totalLatency:  factory.NewHistogram("total_latency", 
meter.DefBuckets, labels...),
+               totalErr:      factory.NewCounter("total_err", append(labels, 
"error_type")...),
+               sentBytes:     factory.NewCounter("sent_bytes", labels...),
+       }
+}
diff --git a/banyand/queue/pub/migration_metrics_test.go 
b/banyand/queue/pub/migration_metrics_test.go
new file mode 100644
index 000000000..7c600bf81
--- /dev/null
+++ b/banyand/queue/pub/migration_metrics_test.go
@@ -0,0 +1,124 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+func newPubMigrationMetricsWithErrCapture(totalErr *errReasonCapturerImpl) 
*pubMigrationMetrics { //nolint:exhaustruct
+       return &pubMigrationMetrics{
+               totalStarted:  &countingCounter{},
+               totalFinished: &countingCounter{},
+               totalLatency:  &noopHistogram{},
+               totalErr:      totalErr,
+               sentBytes:     &countingCounter{},
+       }
+}
+
+// TestNewWithoutMetadataNilLeavesMigrationMetricsNil verifies that the
+// lifecycle/test publishers built with a nil registry emit neither the
+// banyandb_queue_pub_* nor the banyandb_lifecycle_migration_* family.
+func TestNewWithoutMetadataNilLeavesMigrationMetricsNil(t *testing.T) {
+       pp, ok := NewWithoutMetadata(nil).(*pub)
+       require.True(t, ok)
+       require.Nil(t, pp.migrationMetrics, "migration metrics must be nil 
without a registry")
+       require.Nil(t, pp.metrics, "queue_pub metrics stay nil for the 
metadata-less publisher")
+}
+
+// TestNewWithoutMetadataWithRegistryRegistersMigrationMetrics verifies that
+// supplying a registry registers the banyandb_lifecycle_migration_* family 
while
+// the banyandb_queue_pub_* family stays disabled (no metadata).
+func TestNewWithoutMetadataWithRegistryRegistersMigrationMetrics(t *testing.T) 
{
+       pp, ok := NewWithoutMetadata(observability.BypassRegistry).(*pub)
+       require.True(t, ok)
+       require.NotNil(t, pp.migrationMetrics, "migration metrics must be 
registered when a registry is supplied")
+       require.Nil(t, pp.metrics, "queue_pub family must remain disabled 
(metadata is nil)")
+}
+
+// TestRetrySendMirrorsErrorToMigrationMetrics verifies the batch-write path 
emits
+// the error reason to BOTH families in lock-step when both are present.
+func TestRetrySendMirrorsErrorToMigrationMetrics(t *testing.T) {
+       pubErr := newErrReasonCapturer()
+       migErr := newErrReasonCapturer()
+       p := &pub{ //nolint:exhaustruct
+               metrics:          newPubMetricsWithErrCapture(pubErr),
+               migrationMetrics: newPubMigrationMetricsWithErrCapture(migErr),
+               nodeCache:        make(map[string]nodeInfo),
+       }
+       bp := &batchPublisher{pub: p, topic: topicPtr(data.TopicMeasureWrite)}
+
+       mockStream := NewMockSendClient(context.Background())
+       mockStream.SetSendFunc(func(*clusterv1.SendRequest) error {
+               return status.Error(codes.Unavailable, "transient")
+       })
+
+       require.Error(t, bp.retrySend(context.Background(), mockStream, 
&clusterv1.SendRequest{}, "n1"))
+       require.Equal(t, float64(1), pubErr.sum(sendErrReasonRetryExhausted))
+       require.Equal(t, float64(1), migErr.sum(sendErrReasonRetryExhausted), 
"migration family must mirror the pub family")
+}
+
+// TestPublishMirrorsStartedFinishedToMigrationMetrics verifies the batch-write
+// success path increments started+finished on BOTH families.
+func TestPublishMirrorsStartedFinishedToMigrationMetrics(t *testing.T) {
+       pubStarted, pubFinished := &countingCounter{}, &countingCounter{}
+       migStarted, migFinished := &countingCounter{}, &countingCounter{}
+       pm := &pubMetrics{ //nolint:exhaustruct
+               totalStarted:  pubStarted,
+               totalFinished: pubFinished,
+               totalLatency:  &noopHistogram{},
+               totalErr:      newErrReasonCapturer(),
+               sentBytes:     &countingCounter{},
+       }
+       p := newPubWithConnMgrForMetrics(t, pm)
+       p.migrationMetrics = &pubMigrationMetrics{ //nolint:exhaustruct
+               totalStarted:  migStarted,
+               totalFinished: migFinished,
+               totalLatency:  &noopHistogram{},
+               totalErr:      newErrReasonCapturer(),
+               sentBytes:     &countingCounter{},
+       }
+
+       mockStream := NewMockSendClient(context.Background())
+       mockStream.SetSendFunc(func(*clusterv1.SendRequest) error { return nil 
})
+       doneCh := make(chan struct{})
+       close(doneCh)
+
+       const nodeName = "node-a"
+       bp := p.NewBatchPublisher(10 * time.Second).(*batchPublisher)
+       bp.streams[nodeName] = writeStream{client: mockStream, ctxDoneCh: 
doneCh}
+
+       _, publishErr := bp.Publish(context.Background(), 
data.TopicMeasureWrite, bus.NewMessageWithNode(1, nodeName, []byte("payload")))
+       require.NoError(t, publishErr)
+
+       require.Equal(t, float64(1), pubStarted.count)
+       require.Equal(t, float64(1), migStarted.count, "migration started must 
mirror pub started")
+       require.Equal(t, float64(1), pubFinished.count)
+       require.Equal(t, float64(1), migFinished.count, "migration finished 
must mirror pub finished")
+}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 83be1e3b8..e7d656183 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -72,25 +72,26 @@ var (
 
 type pub struct {
        schema.UnimplementedOnInitHandler
-       metadata        metadata.Repo
-       handlers        map[bus.Topic]schema.EventHandler
-       log             *logger.Logger
-       metrics         *pubMetrics
-       connMgr         *grpchelper.ConnManager[*client]
-       closer          *run.Closer
-       writableProbe   map[string]map[string]struct{}
-       nodeCache       map[string]nodeInfo
-       caCertPath      string
-       caCertReloader  *pkgtls.Reloader
-       prefix          string
-       retryPolicy     string
-       selfNode        string
-       selfRole        string
-       selfTier        string
-       allowedRoles    []databasev1.Role
-       writableProbeMu sync.Mutex
-       nodeCacheMu     sync.RWMutex
-       tlsEnabled      bool
+       metadata         metadata.Repo
+       handlers         map[bus.Topic]schema.EventHandler
+       log              *logger.Logger
+       metrics          *pubMetrics
+       migrationMetrics *pubMigrationMetrics
+       connMgr          *grpchelper.ConnManager[*client]
+       closer           *run.Closer
+       writableProbe    map[string]map[string]struct{}
+       nodeCache        map[string]nodeInfo
+       caCertPath       string
+       caCertReloader   *pkgtls.Reloader
+       prefix           string
+       retryPolicy      string
+       selfNode         string
+       selfRole         string
+       selfTier         string
+       allowedRoles     []databasev1.Role
+       writableProbeMu  sync.Mutex
+       nodeCacheMu      sync.RWMutex
+       tlsEnabled       bool
 }
 
 // nodeInfo caches the resolved role and tier for a remote node.
@@ -455,7 +456,12 @@ func New(metadata metadata.Repo, roles ...databasev1.Role) 
queue.Client {
 
 // NewWithoutMetadata returns a new queue client without metadata, defaulting 
to data nodes.
 // sender_* fields are left empty for this lifecycle-tool publisher.
-func NewWithoutMetadata() queue.Client {
+//
+// If omr is non-nil, a parallel banyandb_lifecycle_migration_* metric family 
is
+// registered on it. The regular banyandb_queue_pub_* family stays disabled 
because
+// metadata is nil (PreRun gates it on metadata != nil). Pass nil to leave the
+// migration metrics disabled (e.g. tests, or non-migration clients).
+func NewWithoutMetadata(omr observability.MetricsRegistry) queue.Client {
        p := New(nil, databasev1.Role_ROLE_DATA)
        pp := p.(*pub)
        pp.log = logger.GetLogger("queue-client")
@@ -465,6 +471,9 @@ func NewWithoutMetadata() queue.Client {
                RetryPolicy:    pp.retryPolicy,
                MaxRecvMsgSize: maxReceiveMessageSize,
        })
+       if omr != nil {
+               pp.migrationMetrics = 
newPubMigrationMetrics(omr.With(lifecycleMigrationScope))
+       }
        return p
 }
 
@@ -662,18 +671,19 @@ func (p *pub) NewChunkedSyncClientWithConfig(node string, 
config *ChunkedSyncCli
                info = resolveNodeInfo(n)
        }
        return &chunkedSyncClient{
-               client:     c.client,
-               conn:       c.conn,
-               node:       node,
-               log:        p.log,
-               metrics:    p.metrics,
-               selfNode:   p.selfNode,
-               selfRole:   p.selfRole,
-               selfTier:   p.selfTier,
-               remoteRole: info.role,
-               remoteTier: info.tier,
-               chunkSize:  config.ChunkSize,
-               config:     config,
+               client:           c.client,
+               conn:             c.conn,
+               node:             node,
+               log:              p.log,
+               metrics:          p.metrics,
+               migrationMetrics: p.migrationMetrics,
+               selfNode:         p.selfNode,
+               selfRole:         p.selfRole,
+               selfTier:         p.selfTier,
+               remoteRole:       info.role,
+               remoteTier:       info.tier,
+               chunkSize:        config.ChunkSize,
+               config:           config,
        }, nil
 }
 
diff --git a/banyand/queue/pub/role_gate_test.go 
b/banyand/queue/pub/role_gate_test.go
new file mode 100644
index 000000000..94ec77447
--- /dev/null
+++ b/banyand/queue/pub/role_gate_test.go
@@ -0,0 +1,45 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "testing"
+
+       "github.com/onsi/gomega"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+// TestRoleLessNodeRejectedByGate verifies the publisher's role gate drops a 
node
+// whose roles do not intersect allowedRoles - before any dial. This is the 
exact
+// failure mode the lifecycle native-metrics registration must avoid: it MUST
+// register the co-located data node with ROLE_DATA, otherwise OnAddOrUpdate
+// silently returns and the node never becomes active. The positive (ROLE_DATA 
->
+// active) path is covered by the register/unregister specs in client_test.go.
+func TestRoleLessNodeRejectedByGate(t *testing.T) {
+       g := gomega.NewWithT(t)
+       p := newPub() // allowedRoles defaults to ROLE_DATA
+       defer p.GracefulStop()
+
+       node := getDataNode("lifecycle-local", "127.0.0.1:0")
+       node.Spec.(*databasev1.Node).Roles = nil
+       p.OnAddOrUpdate(node)
+
+       g.Expect(p.connMgr.ActiveCount()).To(gomega.Equal(0), "role-less node 
must not become active")
+       g.Expect(p.connMgr.EvictableCount()).To(gomega.Equal(0), "role-less 
node must not even be dialed")
+}
diff --git a/banyand/queue/test/chunked_sync_common.go 
b/banyand/queue/test/chunked_sync_common.go
index 566f36e97..a1fecd94f 100644
--- a/banyand/queue/test/chunked_sync_common.go
+++ b/banyand/queue/test/chunked_sync_common.go
@@ -108,7 +108,7 @@ func setupChunkedSyncTestWithChunkSize(t *testing.T, 
testName string, chunkSize
                return errInternal == nil
        }, flags.EventuallyTimeout, 100*time.Millisecond)
 
-       client := pub.NewWithoutMetadata()
+       client := pub.NewWithoutMetadata(nil)
 
        nodeAddr := fmt.Sprintf("localhost:%d", grpcPort)
        nodeName := testName + "-node"
diff --git a/fodc/agent/internal/flightrecorder/datasource.go 
b/fodc/agent/internal/flightrecorder/datasource.go
index 747b467b0..b6ee00559 100644
--- a/fodc/agent/internal/flightrecorder/datasource.go
+++ b/fodc/agent/internal/flightrecorder/datasource.go
@@ -256,6 +256,27 @@ func (ds *Datasource) GetTimestamps() *TimestampRingBuffer 
{
        return ds.timestamps
 }
 
+// GetCurrentSnapshot returns the latest value of every metric together with 
the
+// latest timestamp, all captured under a single lock. UpdateBatch mutates 
under
+// the same lock, so the returned values and timestamp always belong to the 
same
+// batch: callers never observe a mix of an old metric value and a new 
timestamp
+// (or vice versa) while an UpdateBatch is in flight. The map is keyed by the 
same
+// metric key as GetMetrics.
+func (ds *Datasource) GetCurrentSnapshot() (map[string]float64, int64) {
+       ds.mu.RLock()
+       defer ds.mu.RUnlock()
+
+       values := make(map[string]float64, len(ds.metrics))
+       for k, buffer := range ds.metrics {
+               values[k] = buffer.GetCurrentValue()
+       }
+       var timestamp int64
+       if ds.timestamps != nil {
+               timestamp = ds.timestamps.GetCurrentValue()
+       }
+       return values, timestamp
+}
+
 // GetDescriptions returns a copy of the descriptions map.
 func (ds *Datasource) GetDescriptions() map[string]string {
        ds.mu.RLock()
diff --git a/fodc/agent/internal/flightrecorder/datasource_test.go 
b/fodc/agent/internal/flightrecorder/datasource_test.go
index b4daf4417..c144ebd93 100644
--- a/fodc/agent/internal/flightrecorder/datasource_test.go
+++ b/fodc/agent/internal/flightrecorder/datasource_test.go
@@ -514,9 +514,10 @@ func 
TestDatasource_UpdateBatch_ConcurrentReadsDuringUpdate(t *testing.T) {
                        // Wait for batch update to start
                        <-batchStart
 
-                       // Try to read during batch update
-                       metricsMap := ds.GetMetrics()
-                       timestamps := ds.GetTimestamps()
+                       // Try to read during batch update. GetCurrentSnapshot 
reads every
+                       // value plus the timestamp under a single lock, so it 
always returns a
+                       // coherent view of one batch even while UpdateBatch is 
in flight.
+                       values, ts := ds.GetCurrentSnapshot()
 
                        readResults[idx] = struct {
                                metric1Value float64
@@ -524,10 +525,10 @@ func 
TestDatasource_UpdateBatch_ConcurrentReadsDuringUpdate(t *testing.T) {
                                metric3Value float64
                                timestamp    int64
                        }{
-                               metric1Value: 
metricsMap["metric1"].GetCurrentValue(),
-                               metric2Value: 
metricsMap["metric2"].GetCurrentValue(),
-                               metric3Value: 
metricsMap["metric3"].GetCurrentValue(),
-                               timestamp:    timestamps.GetCurrentValue(),
+                               metric1Value: values["metric1"],
+                               metric2Value: values["metric2"],
+                               metric3Value: values["metric3"],
+                               timestamp:    ts,
                        }
 
                        readComplete <- struct{}{}
diff --git a/test/cases/lifecycle/lifecycle.go 
b/test/cases/lifecycle/lifecycle.go
index 4ef7f2183..f145560de 100644
--- a/test/cases/lifecycle/lifecycle.go
+++ b/test/cases/lifecycle/lifecycle.go
@@ -25,6 +25,8 @@ import (
        "fmt"
        "io"
        "io/fs"
+       "net/http"
+       "net/http/httptest"
        "os"
        "path/filepath"
        "sort"
@@ -43,6 +45,7 @@ import (
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/banyand/backup/lifecycle"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
@@ -227,6 +230,25 @@ func verifyLifecycleStages(sc helpers.SharedContext, 
verifyFn func(gomega.Gomega
        })
 }
 
+// verifyMigrationMetrics asserts the lifecycle tier-migration publisher 
emitted
+// the banyandb_lifecycle_migration_* family (the mirror of 
banyandb_queue_pub_*)
+// during the migration. The registry's Prometheus counters persist after the
+// command stops, so we scrape its handler directly rather than a live HTTP 
port.
+func verifyMigrationMetrics(reg observability.MetricsRegistry) {
+       provider, ok := reg.(observability.PrometheusHandlerProvider)
+       gomega.Expect(ok).To(gomega.BeTrue(), "lifecycle metrics registry must 
expose a Prometheus handler")
+       rec := httptest.NewRecorder()
+       provider.PrometheusHandler().ServeHTTP(rec, 
httptest.NewRequest(http.MethodGet, "/metrics", nil))
+       gomega.Expect(rec.Code).To(gomega.Equal(http.StatusOK))
+       body := rec.Body.String()
+       // A successful migration send increments total_finished; the 
measure/stream/trace
+       // part files are sent via the file-sync operation, so that label must 
be present.
+       
gomega.Expect(body).To(gomega.MatchRegexp(`banyandb_lifecycle_migration_total_finished\{[^}]*\}
 [1-9]`),
+               "expected a non-zero 
banyandb_lifecycle_migration_total_finished series, got:\n"+body)
+       gomega.Expect(body).To(gomega.ContainSubstring(`operation="file-sync"`),
+               "file-sync part migration must be metered in the 
banyandb_lifecycle_migration_* family")
+}
+
 func verifySourceDirectoriesAfterMigration() {
        streamSrcPath := filepath.Join(SharedContext.SrcDir, "stream", "data", 
"default")
        streamEntries, err := os.ReadDir(streamSrcPath)
@@ -509,8 +531,10 @@ func crossSegmentTimestamps() (single, left, right 
time.Time) {
 
 // runLifecycleMigration runs a single hot->warm lifecycle migration, pointing
 // every root path at the shared source dir and writing its report to 
reportDir.
-func runLifecycleMigration(progressFile, reportDir string) {
-       lifecycleCmd := lifecycle.NewCommand()
+// It returns the command's metrics registry so callers can verify the emitted
+// banyandb_lifecycle_migration_* family.
+func runLifecycleMigration(progressFile, reportDir string) 
observability.MetricsRegistry {
+       lifecycleCmd, reg := lifecycle.NewCommandWithRegistry()
        args := []string{
                "--grpc-addr", SharedContext.DataAddr,
                "--stream-root-path", SharedContext.SrcDir,
@@ -522,6 +546,7 @@ func runLifecycleMigration(progressFile, reportDir string) {
        args = append(args, SharedContext.MetadataFlags...)
        lifecycleCmd.SetArgs(args)
        gomega.Expect(lifecycleCmd.Execute()).To(gomega.Succeed())
+       return reg
 }
 
 // drainWriteAcks closes a client-streaming write and fails the spec on any
@@ -737,7 +762,11 @@ var _ = ginkgo.Describe("Measure cross-segment migration", 
ginkgo.Ordered, func(
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                defer os.RemoveAll(dir)
                rf := filepath.Join(dir, "report")
-               runLifecycleMigration(filepath.Join(dir, "progress.json"), rf)
+               migrationMetrics := runLifecycleMigration(filepath.Join(dir, 
"progress.json"), rf)
+               // This Describe seeds sw_cross_segment immediately above and 
migrates it
+               // here, so the tier-migration publisher always emits the
+               // banyandb_lifecycle_migration_* family regardless of spec 
ordering.
+               verifyMigrationMetrics(migrationMetrics)
 
                // === 4. Verify the destination directory contains multiple 
seg-* folders for
                //        sw_cross_segment. Pre-fix, the cross-segment part 
would copy entirely

Reply via email to