This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new edb123974 feat(lifecycle): expose Prometheus metrics and the migration
metric family (#1164)
edb123974 is described below
commit edb123974a7e45c1dc43f3b128ebcd8b1cad08c5
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Jun 9 09:31:51 2026 +0800
feat(lifecycle): expose Prometheus metrics and the migration metric family
(#1164)
* feat(observability): support listener-suppressed metric service
Add NewMetricServiceWithoutListener plus a listenerDisabled flag so an
embedding service can reuse its own HTTP server for the Prometheus
endpoint instead of opening the :2121 observability listener. Serve()
skips the http.Server (releasing the closer slot) and exposes the
registry through the new PrometheusHandlerProvider optional interface.
Validate() still fails fast on an empty addr for the default
constructor, so standalone/data/liaison stay unaffected.
Signed-off-by: Hongtao Gao <[email protected]>
---
banyand/backup/lifecycle/lifecycle.go | 44 +++++-
banyand/backup/lifecycle/metrics_test.go | 107 +++++++++++++
banyand/backup/lifecycle/service.go | 173 +++++++++++++++++++--
banyand/backup/lifecycle/steps.go | 4 +-
banyand/backup/lifecycle/steps_test.go | 2 +-
banyand/observability/services/listener_test.go | 97 ++++++++++++
banyand/observability/services/service.go | 77 ++++++++-
banyand/observability/type.go | 11 ++
banyand/queue/pub/batch.go | 35 +++++
banyand/queue/pub/chunked_sync.go | 55 +++++--
banyand/queue/pub/migration_metrics.go | 51 ++++++
banyand/queue/pub/migration_metrics_test.go | 124 +++++++++++++++
banyand/queue/pub/pub.go | 74 +++++----
banyand/queue/pub/role_gate_test.go | 45 ++++++
banyand/queue/test/chunked_sync_common.go | 2 +-
fodc/agent/internal/flightrecorder/datasource.go | 21 +++
.../internal/flightrecorder/datasource_test.go | 15 +-
test/cases/lifecycle/lifecycle.go | 35 ++++-
18 files changed, 891 insertions(+), 81 deletions(-)
diff --git a/banyand/backup/lifecycle/lifecycle.go
b/banyand/backup/lifecycle/lifecycle.go
index 51c992138..c6f74dc37 100644
--- a/banyand/backup/lifecycle/lifecycle.go
+++ b/banyand/backup/lifecycle/lifecycle.go
@@ -24,9 +24,15 @@ import (
"github.com/spf13/cobra"
+ "github.com/apache/skywalking-banyandb/api/data"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/banyand/observability/services"
+ "github.com/apache/skywalking-banyandb/banyand/queue/pub"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/node"
"github.com/apache/skywalking-banyandb/pkg/panicdiag"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/signal"
@@ -35,15 +41,39 @@ import (
// NewCommand creates a new lifecycle command.
func NewCommand() *cobra.Command {
+ cmd, _ := NewCommandWithRegistry()
+ return cmd
+}
+
+// NewCommandWithRegistry creates a new lifecycle command and also returns the
+// metrics registry it wires. Tests and embedders use the registry to inspect
the
+// lifecycle and banyandb_lifecycle_migration_* metric families (e.g. via its
+// observability.PrometheusHandlerProvider) without scraping a live HTTP port.
+func NewCommandWithRegistry() (*cobra.Command, observability.MetricsRegistry) {
logging := logger.Logging{}
crashOutputCfg := panicdiag.NewCrashOutputConfig()
metaSvc, err := metadata.NewClient()
if err != nil {
logger.GetLogger().Err(err).Msg("failed to initiate metadata
service")
}
- svc := NewService(metaSvc)
+ // Native metrics pipeline: a queue client that publishes _monitoring
measure
+ // writes to the co-located data node over gRPC. It is created idle
here (no
+ // dial until a node is registered); the lifecycle service registers
the local
+ // data node — with its --grpc-addr, known only after flag parsing —
during its
+ // Serve phase when native mode is enabled.
+ // nil migration registry: this client carries native _monitoring
writes, not
+ // tier-migration traffic, so it must not emit the
banyandb_lifecycle_migration_* family.
+ metricsClient := pub.NewWithoutMetadata(nil)
+ nodeSelector, _ := node.NewPickFirstSelector()
+ nodeRegistry := grpc.NewClusterNodeRegistry(data.TopicMeasureWrite,
metricsClient, nodeSelector)
+ // Listener-suppressed: the lifecycle reuses its own HTTP port for
/metrics
+ // instead of opening the observability :2121 listener.
+ metricSvc := services.NewMetricServiceWithoutListener(metaSvc,
metricsClient, "lifecycle", nodeRegistry)
+ svc := NewService(metaSvc, metricSvc, metricsClient)
group := run.NewGroup("lifecycle")
- group.Register(new(signal.Handler), metaSvc, svc)
+ // metricSvc is registered before svc so its PreRun (building the
providers)
+ // runs first; svc.PreRun then safely derives its proof counter from it.
+ group.Register(new(signal.Handler), metaSvc, metricSvc, svc)
cmd := &cobra.Command{
Short: "Run lifecycle migration",
DisableAutoGenTag: true,
@@ -64,7 +94,13 @@ func NewCommand() *cobra.Command {
os.Exit(-1)
}
}()
- if err := group.Run(context.Background()); err != nil {
+ runCtx := context.Background()
+ if metricSvc.NativeEnabled() {
+ // Native mode stamps the lifecycle node
identity onto every
+ // _monitoring series; the metric service reads
it from the context.
+ runCtx = nativeNodeContext(runCtx)
+ }
+ if err := group.Run(runCtx); err != nil {
logger.GetLogger().Error().Err(err).Stack().Str("name",
group.Name()).Msg("Exit")
os.Exit(-1)
}
@@ -75,5 +111,5 @@ func NewCommand() *cobra.Command {
cmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the
logging")
cmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the
root level of logging")
crashOutputCfg.RegisterFlags(cmd.Flags())
- return cmd
+ return cmd, metricSvc
}
diff --git a/banyand/backup/lifecycle/metrics_test.go
b/banyand/backup/lifecycle/metrics_test.go
new file mode 100644
index 000000000..5b764184c
--- /dev/null
+++ b/banyand/backup/lifecycle/metrics_test.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+ "context"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+)
+
+// TestBuildLocalNodeMetadataHasDataRole guards the C1 failure mode: the
+// co-located data node MUST be registered with ROLE_DATA, or the pub client's
+// role gate silently drops it and native metrics never reach the data node.
+func TestBuildLocalNodeMetadataHasDataRole(t *testing.T) {
+ md := buildLocalNodeMD("127.0.0.1:17912")
+ node, ok := md.Spec.(*databasev1.Node)
+ require.True(t, ok)
+ require.Equal(t, metricsLocalNodeName, node.GetMetadata().GetName())
+ require.Equal(t, "127.0.0.1:17912", node.GetGrpcAddress())
+ require.Contains(t, node.GetRoles(), databasev1.Role_ROLE_DATA,
+ "local node must carry ROLE_DATA or the pub role gate silently
drops it")
+}
+
+// TestNativeNodeContextSetsIdentity asserts the native node identity is
injected
+// with a non-empty NodeID so per-pod _monitoring series stay distinct.
+func TestNativeNodeContextSetsIdentity(t *testing.T) {
+ ctx := nativeNodeContext(context.Background())
+ value := ctx.Value(common.ContextNodeKey)
+ require.NotNil(t, value)
+ n, ok := value.(common.Node)
+ require.True(t, ok)
+ require.NotEmpty(t, n.NodeID)
+}
+
+// stubPromRegistry is a MetricsRegistry that also exposes a Prometheus
handler,
+// used to exercise buildHTTPRouter's /metrics mounting without a real
registry.
+type stubPromRegistry struct {
+ observability.MetricsRegistry
+ handler http.Handler
+}
+
+func (s stubPromRegistry) PrometheusHandler() http.Handler { return s.handler }
+
+// TestBuildHTTPRouterServesMetricsAndAPI asserts /metrics is mounted on the
same
+// router as /api (port reuse, requirement #1) without the two routes
colliding.
+func TestBuildHTTPRouterServesMetricsAndAPI(t *testing.T) {
+ metricsHit, apiHit := false, false
+ l := &lifecycleService{
+ omr: stubPromRegistry{
+ MetricsRegistry: observability.BypassRegistry,
+ handler: http.HandlerFunc(func(w http.ResponseWriter, _
*http.Request) {
+ metricsHit = true
+ w.WriteHeader(http.StatusOK)
+ }),
+ },
+ }
+ apiHandler := http.HandlerFunc(func(w http.ResponseWriter, _
*http.Request) {
+ apiHit = true
+ w.WriteHeader(http.StatusOK)
+ })
+ router := l.buildHTTPRouter(apiHandler)
+
+ metricsRec := httptest.NewRecorder()
+ router.ServeHTTP(metricsRec, httptest.NewRequest(http.MethodGet,
"/metrics", nil))
+ require.Equal(t, http.StatusOK, metricsRec.Code)
+ require.True(t, metricsHit, "/metrics must be routed to the prometheus
handler")
+
+ apiRec := httptest.NewRecorder()
+ router.ServeHTTP(apiRec, httptest.NewRequest(http.MethodGet,
"/api/foo", nil))
+ require.Equal(t, http.StatusOK, apiRec.Code)
+ require.True(t, apiHit, "/api must still route to the gateway handler")
+}
+
+// TestBuildHTTPRouterWithoutPromHandler asserts that when the registry does
not
+// expose a Prometheus handler (e.g. BypassRegistry), /metrics is simply absent
+// and /api keeps working.
+func TestBuildHTTPRouterWithoutPromHandler(t *testing.T) {
+ l := &lifecycleService{omr: observability.BypassRegistry}
+ router := l.buildHTTPRouter(http.HandlerFunc(func(w
http.ResponseWriter, _ *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ }))
+ rec := httptest.NewRecorder()
+ router.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics",
nil))
+ require.Equal(t, http.StatusNotFound, rec.Code)
+}
diff --git a/banyand/backup/lifecycle/service.go
b/banyand/backup/lifecycle/service.go
index c98ec1cfa..84526a6e3 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -52,10 +52,13 @@ import (
"github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/healthcheck"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
@@ -69,22 +72,35 @@ type service interface {
var _ service = (*lifecycleService)(nil)
+const (
+ // metricsLocalNodeName is the connMgr key for the co-located data node
that
+ // receives the lifecycle's native _monitoring measure writes.
+ metricsLocalNodeName = "lifecycle-local"
+ // metricsNodeKeeperInterval is how often the keeper re-checks that the
local
+ // data node connection is active and re-registers it if not.
+ metricsNodeKeeperInterval = 10 * time.Second
+)
+
type lifecycleService struct {
databasev1.UnimplementedClusterStateServiceServer
databasev1.UnimplementedNodeQueryServiceServer
metadata metadata.Repo
omr observability.MetricsRegistry
pm protector.Memory
- clusterStateMgr *clusterStateManager
- l *logger.Logger
- sch *timestamp.Scheduler
+ cyclesTotal meter.Counter
+ metricsClient queue.Client
grpcServer *grpclib.Server
httpSrv *http.Server
tlsReloader *pkgtls.Reloader
currentNode *databasev1.Node
clientCloser context.CancelFunc
stopCh chan struct{}
- measureRoot string
+ sch *timestamp.Scheduler
+ l *logger.Logger
+ clusterStateMgr *clusterStateManager
+ metricsKeeperStop chan struct{}
+ lifecycleHost string
+ lifecycleHTTPAddr string
streamRoot string
traceRoot string
progressFilePath string
@@ -92,31 +108,62 @@ type lifecycleService struct {
schedule string
cert string
gRPCAddr string
- lifecycleHost string
+ lifecycleKeyFile string
lifecycleGRPCAddr string
- lifecycleHTTPAddr string
+ measureRoot string
lifecycleCertFile string
- lifecycleKeyFile string
+ localNodeMD schema.Metadata
+ maxExecutionTimes int
+ chunkSize run.Bytes
lifecycleGRPCPort uint32
lifecycleHTTPPort uint32
- maxExecutionTimes int
enableTLS bool
insecure bool
lifecycleTLS bool
- chunkSize run.Bytes
}
-// NewService creates a new lifecycle service.
-func NewService(meta metadata.Repo) run.Unit {
+// NewService creates a new lifecycle service. metricsRegistry replaces the
+// previous BypassRegistry, so the protector memory metrics and the lifecycle
+// proof counter emit real series. metricsClient is the native-metrics
pipeline to
+// the co-located data node; it is only exercised when native mode is enabled.
+func NewService(
+ meta metadata.Repo,
+ metricsRegistry observability.MetricsRegistry,
+ metricsClient queue.Client,
+) run.Unit {
ls := &lifecycleService{
metadata: meta,
- omr: observability.BypassRegistry,
+ omr: metricsRegistry,
+ metricsClient: metricsClient,
clusterStateMgr: &clusterStateManager{},
}
ls.pm = protector.NewMemory(ls.omr)
return ls
}
+// nativeNodeContext augments ctx with the lifecycle node identity that the
metric
+// service's native mode stamps onto every _monitoring series. The identity is
the
+// lifecycle's own (Type="lifecycle", set via the metric service nodeType);
NodeID
+// prefers POD_NAME (downward API) and falls back to the hostname (the pod
name in
+// k8s) so series from different pods stay distinct. The lifecycle's own
gRPC/HTTP
+// addresses are not computed until Validate (later, and only when scheduled),
so
+// the address tags are left empty.
+func nativeNodeContext(ctx context.Context) context.Context {
+ nodeID := os.Getenv("POD_NAME")
+ if nodeID == "" {
+ if hostname, hostErr := os.Hostname(); hostErr == nil {
+ nodeID = hostname
+ }
+ }
+ if nodeID == "" {
+ nodeID = "lifecycle"
+ }
+ return context.WithValue(ctx, common.ContextNodeKey, common.Node{
+ NodeID: nodeID,
+ Labels: common.ParseNodeFlags(),
+ })
+}
+
func (l *lifecycleService) FlagSet() *run.FlagSet {
flagS := run.NewFlagSet(l.Name())
flagS.StringSliceVar(&common.FlagNodeLabels, "node-labels", nil, "the
node labels. e.g. key1=value1,key2=value2")
@@ -186,6 +233,10 @@ func (l *lifecycleService) Validate() error {
func (l *lifecycleService) PreRun(_ context.Context) error {
l.l = logger.GetLogger("lifecycle")
+ // Safe to call With() here: the metrics registry is registered earlier
in the
+ // group, so its PreRun (which builds the provider) has already run.
+ l.cyclesTotal =
l.omr.With(observability.RootScope.SubScope("lifecycle")).NewCounter("cycles_total")
+
if l.schedule != "" && l.lifecycleTLS {
var err error
l.tlsReloader, err = pkgtls.NewReloader(l.lifecycleCertFile,
l.lifecycleKeyFile, l.l)
@@ -202,6 +253,18 @@ func (l *lifecycleService) GracefulStop() {
l.sch.Close()
}
+ // Stop the native metrics node keeper, then close the local metrics pub
+ // client. The metric service may still attempt a final flush
concurrently
+ // during shutdown; native metrics are best-effort, so a flush that
loses the
+ // race to the closing client simply logs and is dropped.
+ if l.metricsKeeperStop != nil {
+ close(l.metricsKeeperStop)
+ l.metricsKeeperStop = nil
+ }
+ if l.metricsClient != nil {
+ l.metricsClient.GracefulStop()
+ }
+
l.l.Info().Msg("Stopping lifecycle server")
if l.tlsReloader != nil {
@@ -245,10 +308,81 @@ func (l *lifecycleService) Name() string {
return "lifecycle"
}
+// buildLocalNodeMD builds the schema metadata registered on the native metrics
+// pub client for the co-located data node. The ROLE_DATA role is REQUIRED: the
+// pub client's role gate (allowedRoles=[ROLE_DATA]) silently drops a node
whose
+// roles do not intersect, which would make native metrics a silent no-op.
+func buildLocalNodeMD(grpcAddr string) schema.Metadata {
+ return schema.Metadata{
+ TypeMeta: schema.TypeMeta{Kind: schema.KindNode},
+ Spec: &databasev1.Node{
+ Metadata: &commonv1.Metadata{Name:
metricsLocalNodeName},
+ GrpcAddress: grpcAddr,
+ Roles:
[]databasev1.Role{databasev1.Role_ROLE_DATA},
+ },
+ }
+}
+
+// buildHTTPRouter assembles the lifecycle's HTTP routes: the gRPC-gateway API
+// under /api and, when the metrics registry exposes a Prometheus handler, the
+// /metrics endpoint on the same (existing) HTTP server.
+func (l *lifecycleService) buildHTTPRouter(apiHandler http.Handler) *chi.Mux {
+ mux := chi.NewRouter()
+ mux.Mount("/api", http.StripPrefix("/api", apiHandler))
+ if reg, ok := l.omr.(observability.PrometheusHandlerProvider); ok {
+ mux.Handle("/metrics", reg.PrometheusHandler())
+ }
+ return mux
+}
+
+// startMetricsNodeKeeper registers the co-located data node on the native
+// metrics pub client and keeps the connection active. The pub connManager
runs a
+// synchronous health check at registration and does NOT auto-retry an evicted
+// node (NewWithoutMetadata leaves healthCheckInterval==0), so the keeper
+// re-registers whenever the node is not locatable — covering both an initial
+// dial that failed (data node not ready yet) and a later failover eviction.
+func (l *lifecycleService) startMetricsNodeKeeper() {
+ l.localNodeMD = buildLocalNodeMD(l.gRPCAddr)
+ l.metricsKeeperStop = make(chan struct{})
+ // Initial registration dials the local data node so native flushes
route there.
+ l.metricsClient.OnAddOrUpdate(l.localNodeMD)
+ run.Go(context.Background(), "backup.lifecycle.metrics-node-keeper",
l.l, func(_ context.Context) {
+ ticker := time.NewTicker(metricsNodeKeeperInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-l.metricsKeeperStop:
+ return
+ case <-ticker.C:
+ // HealthyNodes reflects the pub's
active-connection state directly.
+ // connMgr.OnAddOrUpdate fans out OnActive
synchronously (which seeds
+ // the native selector), so an empty set means
the local data node is
+ // not connected and native flushes would drop;
force a fresh dial.
+ if len(l.metricsClient.HealthyNodes()) == 0 {
+ // connMgr.OnAddOrUpdate short-circuits
an evictable node, so drop
+ // it first to force a fresh dial.
queue.Client does not expose
+ // OnDelete, hence the type assertion.
+ if deleter, ok :=
l.metricsClient.(interface{ OnDelete(schema.Metadata) }); ok {
+ deleter.OnDelete(l.localNodeMD)
+ }
+
l.metricsClient.OnAddOrUpdate(l.localNodeMD)
+ }
+ }
+ }
+ })
+}
+
func (l *lifecycleService) Serve() run.StopNotify {
l.l = logger.GetLogger("lifecycle")
l.stopCh = make(chan struct{})
+ // Register the co-located data node for native metrics and keep it
active.
+ // Native works in both scheduled and one-shot modes (it is a gRPC
write that
+ // does not depend on the lifecycle's own HTTP server).
+ if l.omr.NativeEnabled() {
+ l.startMetricsNodeKeeper()
+ }
+
// Start gRPC/HTTP servers when schedule is set
if l.schedule != "" {
l.startServers()
@@ -352,8 +486,10 @@ func (l *lifecycleService) startServers() {
return
}
- mux := chi.NewRouter()
- mux.Mount("/api", http.StripPrefix("/api", gwMux))
+ // Reuse this existing HTTP server for the Prometheus endpoint instead
of
+ // opening a separate observability listener. The fodc-agent already
scrapes
+ // this port, so the lifecycle metrics appear with zero deployment
change.
+ mux := l.buildHTTPRouter(gwMux)
l.httpSrv = &http.Server{
Addr: l.lifecycleHTTPAddr,
@@ -403,6 +539,9 @@ func (l *lifecycleService) startServers() {
}
func (l *lifecycleService) action(ctx context.Context) error {
+ if l.cyclesTotal != nil {
+ l.cyclesTotal.Inc(1)
+ }
progress := LoadProgress(l.progressFilePath, l.l)
progress.ClearErrors()
@@ -917,7 +1056,7 @@ func (l *lifecycleService) getGroupsToProcess(ctx
context.Context, progress *Pro
func (l *lifecycleService) processStreamGroup(ctx context.Context, g
*commonv1.Group,
streamDir string, nodes []*databasev1.Node, labels map[string]string,
progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
@@ -1037,7 +1176,7 @@ func (l *lifecycleService)
deleteExpiredStreamSegments(ctx context.Context, g *c
func (l *lifecycleService) processMeasureGroup(ctx context.Context, g
*commonv1.Group, measureDir string,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
@@ -1144,7 +1283,7 @@ func (l *lifecycleService) deleteExpiredTraceSegments(ctx
context.Context, g *co
func (l *lifecycleService) processTraceGroup(ctx context.Context, g
*commonv1.Group, traceDir string,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
diff --git a/banyand/backup/lifecycle/steps.go
b/banyand/backup/lifecycle/steps.go
index b4270cc70..66c1ef04e 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -32,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/queue/pub"
"github.com/apache/skywalking-banyandb/pkg/fs"
@@ -123,6 +124,7 @@ func cloneIntervalRule(ir *commonv1.IntervalRule)
*commonv1.IntervalRule {
func parseGroup(
g *commonv1.Group, nodeLabels map[string]string, nodes
[]*databasev1.Node,
l *logger.Logger, metadata metadata.Repo, clusterStateMgr
*clusterStateManager,
+ omr observability.MetricsRegistry,
) (*GroupConfig, error) {
ro := g.ResourceOpts
if ro == nil {
@@ -190,7 +192,7 @@ func parseGroup(
if ok, _ := nodeSel.OnInit([]schema.Kind{schema.KindGroup}); !ok {
return nil, fmt.Errorf("failed to initialize node selector for
group %s", g.Metadata.Name)
}
- client := pub.NewWithoutMetadata() //nolint:contextcheck // health
check goroutine uses context.Background()
+ client := pub.NewWithoutMetadata(omr) //nolint:contextcheck // health
check goroutine uses context.Background()
switch g.Catalog {
case commonv1.Catalog_CATALOG_STREAM:
_ = grpc.NewClusterNodeRegistry(data.TopicStreamWrite, client,
nodeSel)
diff --git a/banyand/backup/lifecycle/steps_test.go
b/banyand/backup/lifecycle/steps_test.go
index c1728b5f2..bc14ab353 100644
--- a/banyand/backup/lifecycle/steps_test.go
+++ b/banyand/backup/lifecycle/steps_test.go
@@ -85,7 +85,7 @@ func TestParseGroup_RejectsMissingIntervals(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
g := makeGroup(c.mutate)
- _, err := parseGroup(g, map[string]string{"type":
"warm"}, nil, nil, nil, nil)
+ _, err := parseGroup(g, map[string]string{"type":
"warm"}, nil, nil, nil, nil, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), c.errFrag)
})
diff --git a/banyand/observability/services/listener_test.go
b/banyand/observability/services/listener_test.go
new file mode 100644
index 000000000..3824694be
--- /dev/null
+++ b/banyand/observability/services/listener_test.go
@@ -0,0 +1,97 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package services
+
+import (
+ "context"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+)
+
+func newTestPromService(t *testing.T, listenerDisabled bool) *metricService {
+ t.Helper()
+ var reg observability.MetricsRegistry
+ if listenerDisabled {
+ reg = NewMetricServiceWithoutListener(nil, nil, "test", nil)
+ } else {
+ reg = NewMetricService(nil, nil, "test", nil)
+ }
+ svc, ok := reg.(*metricService)
+ require.True(t, ok)
+ // FlagSet is not parsed in unit scope, so set the fields the
validators and
+ // Serve rely on directly.
+ svc.modes = []string{flagPromethusMode}
+ svc.metricsInterval = time.Second
+ svc.nativeFlushInterval = time.Second
+ return svc
+}
+
+// TestMetricServiceValidateFailFast asserts the default constructor still
rejects
+// an empty listener address (M2: fail-fast preserved for
standalone/data/liaison),
+// while the listener-disabled constructor permits it.
+func TestMetricServiceValidateFailFast(t *testing.T) {
+ normal := newTestPromService(t, false)
+ require.ErrorIs(t, normal.Validate(), errNoAddr)
+
+ disabled := newTestPromService(t, true)
+ require.NoError(t, disabled.Validate(), "listener-disabled service must
not require an addr")
+}
+
+// TestPrometheusHandlerServesRegisteredSeries asserts the exposed handler
serves a
+// counter registered through With(scope) - the path the lifecycle uses for its
+// banyandb_lifecycle_cycles_total proof series.
+func TestPrometheusHandlerServesRegisteredSeries(t *testing.T) {
+ svc := newTestPromService(t, true)
+ require.NoError(t, svc.PreRun(context.Background()))
+
+ counter :=
svc.With(observability.RootScope.SubScope("lifecycle")).NewCounter("cycles_total")
+ counter.Inc(1)
+
+ rec := httptest.NewRecorder()
+ svc.PrometheusHandler().ServeHTTP(rec,
httptest.NewRequest(http.MethodGet, "/metrics", nil))
+ require.Equal(t, http.StatusOK, rec.Code)
+ require.Contains(t, rec.Body.String(),
"banyandb_lifecycle_cycles_total")
+}
+
+// TestListenerDisabledServeBindsNoSocket asserts the listener-disabled service
+// neither creates its own http.Server nor blocks on GracefulStop (the closer
slot
+// must be released even though the listener goroutine never starts).
+func TestListenerDisabledServeBindsNoSocket(t *testing.T) {
+ svc := newTestPromService(t, true)
+ require.NoError(t, svc.PreRun(context.Background()))
+
+ require.NotNil(t, svc.Serve())
+ require.Nil(t, svc.svr, "listener-disabled service must not create its
own http.Server")
+
+ done := make(chan struct{})
+ go func() {
+ svc.GracefulStop()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(5 * time.Second):
+ t.Fatal("GracefulStop blocked for a listener-disabled service")
+ }
+}
diff --git a/banyand/observability/services/service.go
b/banyand/observability/services/service.go
index b87dffd5f..f051bfd7d 100644
--- a/banyand/observability/services/service.go
+++ b/banyand/observability/services/service.go
@@ -27,6 +27,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/robfig/cron/v3"
"github.com/apache/skywalking-banyandb/api/common"
@@ -55,8 +56,9 @@ var (
)
var (
- _ run.Service = (*metricService)(nil)
- _ run.Config = (*metricService)(nil)
+ _ run.Service = (*metricService)(nil)
+ _ run.Config = (*metricService)(nil)
+ _ observability.PrometheusHandlerProvider = (*metricService)(nil)
obScope = observability.RootScope.SubScope("observability")
)
@@ -78,6 +80,19 @@ func NewMetricService(metadata metadata.Repo, pipeline
queue.Client, nodeType st
}
}
+// NewMetricServiceWithoutListener returns a metric service that does not start
+// its own HTTP listener. The embedding service is responsible for serving the
+// metrics, e.g. by mounting PrometheusHandler on its own router. This lets the
+// lifecycle command expose /metrics on its existing HTTP port instead of
opening
+// a separate observability listener.
+func NewMetricServiceWithoutListener(metadata metadata.Repo, pipeline
queue.Client, nodeType string, nodeSelector native.NodeSelector)
observability.MetricsRegistry {
+ // Direct assertion: NewMetricService always returns *metricService, so
a
+ // mismatch should fail loudly here rather than nil-deref on the next
line.
+ svc := NewMetricService(metadata, pipeline, nodeType,
nodeSelector).(*metricService)
+ svc.listenerDisabled = true
+ return svc
+}
+
type metricService struct {
metadata metadata.Repo
nodeSelector native.NodeSelector
@@ -97,6 +112,7 @@ type metricService struct {
metricsInterval time.Duration
nativeFlushInterval time.Duration
panicMaxArtifacts int
+ listenerDisabled bool
mutex sync.Mutex
}
@@ -114,7 +130,7 @@ func (p *metricService) FlagSet() *run.FlagSet {
}
func (p *metricService) Validate() error {
- if p.listenAddr == "" {
+ if !p.listenerDisabled && p.listenAddr == "" {
return errNoAddr
}
if len(p.modes) == 0 {
@@ -192,6 +208,14 @@ func (p *metricService) Name() string {
func (p *metricService) Serve() run.StopNotify {
p.mutex.Lock()
defer p.mutex.Unlock()
+ if p.listenerDisabled {
+ // A listener-suppressed service shares its process with a
node's metric
+ // service (e.g. the lifecycle sidecar alongside a data node).
The host
+ // gauges and the MetricsCollector singleton are
process-global, so it must
+ // not (re)initialize or drive them — that races the node's
running
+ // collector. It only flushes its own lifecycle/migration
families.
+ return p.serveWithoutListener()
+ }
p.initMetrics()
startupCtx, cancelStartup := context.WithTimeout(context.Background(),
30*time.Second)
defer cancelStartup()
@@ -248,6 +272,37 @@ func (p *metricService) Serve() run.StopNotify {
return p.closer.CloseNotify()
}
+// serveWithoutListener brings up the metric service without its own HTTP
listener
+// and without touching the process-global host gauges (cpu/mem/net/disk) or
the
+// shared MetricsCollector singleton. The embedding service (e.g. the lifecycle
+// command) serves /metrics via PrometheusHandler; this path only drives
+// native-mode flushing of the lifecycle and migration metric families.
Skipping
+// the global host collectors avoids racing a co-located node's metric service
in
+// single-process deployments.
+func (p *metricService) serveWithoutListener() run.StopNotify {
+ startupCtx, cancelStartup := context.WithTimeout(context.Background(),
30*time.Second)
+ defer cancelStartup()
+ if containsMode(p.modes, flagNativeMode) {
+ p.npf.setServeStarted()
+ p.npf.initAllSchemas(startupCtx)
+ p.nCollection.FlushMetrics(startupCtx)
+ clock, _ := timestamp.GetClock(context.TODO())
+ p.scheduler = timestamp.NewScheduler(p.l, clock)
+ nativeFlushExpr := fmt.Sprintf("@every %s",
p.nativeFlushInterval)
+ if err := p.scheduler.Register(startupCtx,
"native-metric-collection", cron.Descriptor, nativeFlushExpr,
+ func(ctx context.Context, _ time.Time, _
*logger.Logger) bool {
+ p.nCollection.FlushMetrics(ctx)
+ return true
+ }); err != nil {
+ p.l.Fatal().Err(err).Msg("Failed to register native
metric collection")
+ }
+ }
+ // Release the closer slot so GracefulStop's CloseThenWait does not
block.
+ p.l.Info().Msg("metric server listener disabled; metrics served by the
embedding service")
+ p.closer.Done()
+ return p.closer.CloseNotify()
+}
+
func (p *metricService) GracefulStop() {
p.mutex.Lock()
defer p.mutex.Unlock()
@@ -266,10 +321,26 @@ func (p *metricService) NativeEnabled() bool {
func (p *metricService) routeTableHandler(w http.ResponseWriter, _
*http.Request) {
w.Header().Set("Content-Type", "application/json")
+ if p.nodeSelector == nil {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(p.nodeSelector.String()))
}
+// PrometheusHandler returns an http.Handler exposing this service's Prometheus
+// registry, for mounting on an external router (see
PrometheusHandlerProvider).
+// It returns a 404 handler when prometheus mode is not enabled.
+func (p *metricService) PrometheusHandler() http.Handler {
+ if p.promReg == nil {
+ return http.HandlerFunc(func(w http.ResponseWriter, _
*http.Request) {
+ http.Error(w, "prometheus mode not enabled",
http.StatusNotFound)
+ })
+ }
+ return promhttp.HandlerFor(p.promReg, promhttp.HandlerOpts{})
+}
+
func containsMode(modes []string, mode string) bool {
for _, item := range modes {
if item == mode {
diff --git a/banyand/observability/type.go b/banyand/observability/type.go
index 85b07f89c..49ca88256 100644
--- a/banyand/observability/type.go
+++ b/banyand/observability/type.go
@@ -19,6 +19,8 @@
package observability
import (
+ "net/http"
+
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -46,3 +48,12 @@ type MetricsRegistry interface {
// NativeEnabled returns whether the native mode is enabled.
NativeEnabled() bool
}
+
+// PrometheusHandlerProvider is an optional interface implemented by a
+// MetricsRegistry that can expose its Prometheus exposition handler for
mounting
+// on an external HTTP router, instead of serving it on the registry's own
+// listener. The lifecycle command uses this to serve /metrics on its existing
+// HTTP port rather than opening a separate observability listener.
+type PrometheusHandlerProvider interface {
+ PrometheusHandler() http.Handler
+}
diff --git a/banyand/queue/pub/batch.go b/banyand/queue/pub/batch.go
index 7ca830b92..47eaebfda 100644
--- a/banyand/queue/pub/batch.go
+++ b/banyand/queue/pub/batch.go
@@ -209,6 +209,10 @@ func (bp *batchPublisher) hasMetrics() bool {
return bp.pub != nil && bp.pub.metrics != nil
}
+func (bp *batchPublisher) hasMigrationMetrics() bool {
+ return bp.pub != nil && bp.pub.migrationMetrics != nil
+}
+
// listenBatchResponse receives the server response and records failover
events and end-to-end failure metrics.
func (bp *batchPublisher) listenBatchResponse(ctx context.Context, s
clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent, curNode
string) {
defer func() {
@@ -236,6 +240,9 @@ func (bp *batchPublisher) listenBatchResponse(ctx
context.Context, s clusterv1.S
if bp.hasMetrics() {
bp.pub.metrics.totalErr.Inc(1, operation, "", curNode,
info.role, info.tier, sendErrReasonRecvError)
}
+ if bp.hasMigrationMetrics() {
+ bp.pub.migrationMetrics.totalErr.Inc(1, operation, "",
curNode, info.role, info.tier, sendErrReasonRecvError)
+ }
if grpchelper.IsFailoverError(errRecv) {
// Record circuit breaker failure before creating
failover event
bp.pub.connMgr.RecordFailure(curNode, errRecv)
@@ -252,6 +259,9 @@ func (bp *batchPublisher) listenBatchResponse(ctx
context.Context, s clusterv1.S
if bp.hasMetrics() {
bp.pub.metrics.totalErr.Inc(1, operation, "", curNode,
info.role, info.tier, sendErrReasonServerRejected)
}
+ if bp.hasMigrationMetrics() {
+ bp.pub.migrationMetrics.totalErr.Inc(1, operation, "", curNode,
info.role, info.tier, sendErrReasonServerRejected)
+ }
ce := common.NewErrorWithStatus(resp.Status, resp.Error)
// Only failover statuses trigger circuit-breaker accounting; other
server-side
// rejections (e.g. invalid argument) are surfaced to the caller but do
not count
@@ -366,6 +376,9 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
if bp.hasMetrics() {
bp.pub.metrics.totalLatency.Observe(time.Since(start).Seconds(), operation,
group, node, info.role, info.tier)
}
+ if bp.hasMigrationMetrics() {
+
bp.pub.migrationMetrics.totalLatency.Observe(time.Since(start).Seconds(),
operation, group, node, info.role, info.tier)
+ }
}
for attempt := 0; attempt <= defaultMaxRetries; attempt++ {
@@ -379,6 +392,9 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
if bp.hasMetrics() {
bp.pub.metrics.totalErr.Inc(1, operation,
group, node, info.role, info.tier, sendErrReasonCanceled)
}
+ if bp.hasMigrationMetrics() {
+ bp.pub.migrationMetrics.totalErr.Inc(1,
operation, group, node, info.role, info.tier, sendErrReasonCanceled)
+ }
observeLatency()
return ctx.Err()
case <-stream.Context().Done():
@@ -386,6 +402,9 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
if bp.hasMetrics() {
bp.pub.metrics.totalErr.Inc(1, operation,
group, node, info.role, info.tier, sendErrReasonStreamCanceled)
}
+ if bp.hasMigrationMetrics() {
+ bp.pub.migrationMetrics.totalErr.Inc(1,
operation, group, node, info.role, info.tier, sendErrReasonStreamCanceled)
+ }
observeLatency()
return stream.Context().Err()
case <-attemptCtx.Done():
@@ -407,6 +426,10 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
bp.pub.metrics.totalStarted.Inc(1, operation,
group, node, info.role, info.tier)
bp.pub.metrics.totalFinished.Inc(1, operation,
group, node, info.role, info.tier)
}
+ if bp.hasMigrationMetrics() {
+ bp.pub.migrationMetrics.totalStarted.Inc(1,
operation, group, node, info.role, info.tier)
+ bp.pub.migrationMetrics.totalFinished.Inc(1,
operation, group, node, info.role, info.tier)
+ }
// Success writing to the local stream; end-to-end ack
is observed in listenBatchResponse.
observeLatency()
return nil
@@ -420,6 +443,9 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
if bp.hasMetrics() {
bp.pub.metrics.totalErr.Inc(1, operation,
group, node, info.role, info.tier, sendErrReasonNonTransient)
}
+ if bp.hasMigrationMetrics() {
+ bp.pub.migrationMetrics.totalErr.Inc(1,
operation, group, node, info.role, info.tier, sendErrReasonNonTransient)
+ }
observeLatency()
return sendErr
}
@@ -440,12 +466,18 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
if bp.hasMetrics() {
bp.pub.metrics.totalErr.Inc(1, operation,
group, node, info.role, info.tier, sendErrReasonCanceled)
}
+ if bp.hasMigrationMetrics() {
+ bp.pub.migrationMetrics.totalErr.Inc(1,
operation, group, node, info.role, info.tier, sendErrReasonCanceled)
+ }
observeLatency()
return ctx.Err()
case <-stream.Context().Done():
if bp.hasMetrics() {
bp.pub.metrics.totalErr.Inc(1, operation,
group, node, info.role, info.tier, sendErrReasonStreamCanceled)
}
+ if bp.hasMigrationMetrics() {
+ bp.pub.migrationMetrics.totalErr.Inc(1,
operation, group, node, info.role, info.tier, sendErrReasonStreamCanceled)
+ }
observeLatency()
return stream.Context().Err()
}
@@ -455,6 +487,9 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
if bp.hasMetrics() {
bp.pub.metrics.totalErr.Inc(1, operation, group, node,
info.role, info.tier, sendErrReasonRetryExhausted)
}
+ if bp.hasMigrationMetrics() {
+ bp.pub.migrationMetrics.totalErr.Inc(1, operation, group, node,
info.role, info.tier, sendErrReasonRetryExhausted)
+ }
observeLatency()
return fmt.Errorf("retry exhausted for node %s after %d attempts, last
error: %w", node, defaultMaxRetries+1, lastErr)
}
diff --git a/banyand/queue/pub/chunked_sync.go
b/banyand/queue/pub/chunked_sync.go
index f0da29073..6c30fef0b 100644
--- a/banyand/queue/pub/chunked_sync.go
+++ b/banyand/queue/pub/chunked_sync.go
@@ -44,18 +44,43 @@ const (
)
type chunkedSyncClient struct {
- client clusterv1.ServiceClient
- conn *grpc.ClientConn
- log *logger.Logger
- metrics *pubMetrics
- config *ChunkedSyncClientConfig
- selfNode string
- selfRole string
- selfTier string
- node string
- remoteRole string
- remoteTier string
- chunkSize uint32
+ client clusterv1.ServiceClient
+ conn *grpc.ClientConn
+ log *logger.Logger
+ metrics *pubMetrics
+ migrationMetrics *pubMigrationMetrics
+ config *ChunkedSyncClientConfig
+ selfNode string
+ selfRole string
+ selfTier string
+ node string
+ remoteRole string
+ remoteTier string
+ chunkSize uint32
+}
+
+// migStarted, migErr and migFinished emit the banyandb_lifecycle_migration_*
+// mirror of the file-sync metrics. They are extracted from SyncStreamingParts
so
+// the parallel guards do not push that function over the gocyclo budget; the
+// banyandb_queue_pub_* emissions stay inline and untouched.
+func (c *chunkedSyncClient) migStarted(operation, group string) {
+ if c.migrationMetrics != nil {
+ c.migrationMetrics.totalStarted.Inc(1, operation, group,
c.node, c.remoteRole, c.remoteTier)
+ }
+}
+
+func (c *chunkedSyncClient) migErr(operation, group, errType string) {
+ if c.migrationMetrics != nil {
+ c.migrationMetrics.totalErr.Inc(1, operation, group, c.node,
c.remoteRole, c.remoteTier, errType)
+ }
+}
+
+func (c *chunkedSyncClient) migFinished(operation, group string, duration
time.Duration, bytesSent uint64) {
+ if c.migrationMetrics != nil {
+ c.migrationMetrics.totalFinished.Inc(1, operation, group,
c.node, c.remoteRole, c.remoteTier)
+ c.migrationMetrics.totalLatency.Observe(duration.Seconds(),
operation, group, c.node, c.remoteRole, c.remoteTier)
+ c.migrationMetrics.sentBytes.Inc(float64(bytesSent), operation,
group, c.node, c.remoteRole, c.remoteTier)
+ }
}
// SyncStreamingParts implements queue.ChunkedSyncClient with streaming
support.
@@ -114,6 +139,7 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
if c.metrics != nil {
c.metrics.totalStarted.Inc(1, operation, group, c.node,
c.remoteRole, c.remoteTier)
}
+ c.migStarted(operation, group)
metadata := &clusterv1.SyncMetadata{
Group: group,
@@ -134,6 +160,7 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
if c.metrics != nil {
c.metrics.totalErr.Inc(1, operation, group, c.node,
c.remoteRole, c.remoteTier, errType)
}
+ c.migErr(operation, group, errType)
return nil, fmt.Errorf("failed to stream parts: %w", streamErr)
}
if totalChunks == 0 && len(failedParts) == 0 {
@@ -143,6 +170,7 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
c.metrics.totalLatency.Observe(duration.Seconds(),
operation, group, c.node, c.remoteRole, c.remoteTier)
c.metrics.sentBytes.Inc(float64(totalBytesSent),
operation, group, c.node, c.remoteRole, c.remoteTier)
}
+ c.migFinished(operation, group, duration, totalBytesSent)
return &queue.SyncResult{
Success: true,
SessionID: sessionID,
@@ -160,6 +188,7 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
if c.metrics != nil {
c.metrics.totalErr.Inc(1, operation, group,
c.node, c.remoteRole, c.remoteTier, "recv_error")
}
+ c.migErr(operation, group, "recv_error")
return nil, fmt.Errorf("failed to receive final
response: %w", recvErr)
}
finalResp = resp
@@ -185,10 +214,12 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
c.metrics.totalLatency.Observe(duration.Seconds(),
operation, group, c.node, c.remoteRole, c.remoteTier)
c.metrics.sentBytes.Inc(float64(totalBytesSent),
operation, group, c.node, c.remoteRole, c.remoteTier)
}
+ c.migFinished(operation, group, duration, totalBytesSent)
} else {
if c.metrics != nil {
c.metrics.totalErr.Inc(1, operation, group, c.node,
c.remoteRole, c.remoteTier, "completion_error")
}
+ c.migErr(operation, group, "completion_error")
}
return &queue.SyncResult{
diff --git a/banyand/queue/pub/migration_metrics.go
b/banyand/queue/pub/migration_metrics.go
new file mode 100644
index 000000000..0be8e4320
--- /dev/null
+++ b/banyand/queue/pub/migration_metrics.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+// lifecycleMigrationScope produces the banyandb_lifecycle_migration_* family:
the
+// tier-migration mirror of the banyandb_queue_pub_* family (see
queuePubScope).
+var lifecycleMigrationScope =
observability.RootScope.SubScope("lifecycle_migration")
+
+// pubMigrationMetrics mirrors pubMetrics for the lifecycle tier-migration
+// publisher. That publisher is built via NewWithoutMetadata (no metadata), so
the
+// regular pubMetrics stay nil and emit nothing; this parallel family is
registered
+// from an explicitly supplied MetricsRegistry instead, on its own scope so the
+// migration layer can be queried independently of the write-pipeline pub
family.
+type pubMigrationMetrics struct {
+ totalStarted meter.Counter
+ totalFinished meter.Counter
+ totalLatency meter.Histogram
+ totalErr meter.Counter
+ sentBytes meter.Counter
+}
+
+func newPubMigrationMetrics(factory observability.Factory)
*pubMigrationMetrics {
+ labels := []string{"operation", "group", "remote_node", "remote_role",
"remote_tier"}
+ return &pubMigrationMetrics{
+ totalStarted: factory.NewCounter("total_started", labels...),
+ totalFinished: factory.NewCounter("total_finished", labels...),
+ totalLatency: factory.NewHistogram("total_latency",
meter.DefBuckets, labels...),
+ totalErr: factory.NewCounter("total_err", append(labels,
"error_type")...),
+ sentBytes: factory.NewCounter("sent_bytes", labels...),
+ }
+}
diff --git a/banyand/queue/pub/migration_metrics_test.go
b/banyand/queue/pub/migration_metrics_test.go
new file mode 100644
index 000000000..7c600bf81
--- /dev/null
+++ b/banyand/queue/pub/migration_metrics_test.go
@@ -0,0 +1,124 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+func newPubMigrationMetricsWithErrCapture(totalErr *errReasonCapturerImpl)
*pubMigrationMetrics { //nolint:exhaustruct
+ return &pubMigrationMetrics{
+ totalStarted: &countingCounter{},
+ totalFinished: &countingCounter{},
+ totalLatency: &noopHistogram{},
+ totalErr: totalErr,
+ sentBytes: &countingCounter{},
+ }
+}
+
+// TestNewWithoutMetadataNilLeavesMigrationMetricsNil verifies that the
+// lifecycle/test publishers built with a nil registry emit neither the
+// banyandb_queue_pub_* nor the banyandb_lifecycle_migration_* family.
+func TestNewWithoutMetadataNilLeavesMigrationMetricsNil(t *testing.T) {
+ pp, ok := NewWithoutMetadata(nil).(*pub)
+ require.True(t, ok)
+ require.Nil(t, pp.migrationMetrics, "migration metrics must be nil
without a registry")
+ require.Nil(t, pp.metrics, "queue_pub metrics stay nil for the
metadata-less publisher")
+}
+
+// TestNewWithoutMetadataWithRegistryRegistersMigrationMetrics verifies that
+// supplying a registry registers the banyandb_lifecycle_migration_* family
while
+// the banyandb_queue_pub_* family stays disabled (no metadata).
+func TestNewWithoutMetadataWithRegistryRegistersMigrationMetrics(t *testing.T)
{
+ pp, ok := NewWithoutMetadata(observability.BypassRegistry).(*pub)
+ require.True(t, ok)
+ require.NotNil(t, pp.migrationMetrics, "migration metrics must be
registered when a registry is supplied")
+ require.Nil(t, pp.metrics, "queue_pub family must remain disabled
(metadata is nil)")
+}
+
+// TestRetrySendMirrorsErrorToMigrationMetrics verifies the batch-write path
emits
+// the error reason to BOTH families in lock-step when both are present.
+func TestRetrySendMirrorsErrorToMigrationMetrics(t *testing.T) {
+ pubErr := newErrReasonCapturer()
+ migErr := newErrReasonCapturer()
+ p := &pub{ //nolint:exhaustruct
+ metrics: newPubMetricsWithErrCapture(pubErr),
+ migrationMetrics: newPubMigrationMetricsWithErrCapture(migErr),
+ nodeCache: make(map[string]nodeInfo),
+ }
+ bp := &batchPublisher{pub: p, topic: topicPtr(data.TopicMeasureWrite)}
+
+ mockStream := NewMockSendClient(context.Background())
+ mockStream.SetSendFunc(func(*clusterv1.SendRequest) error {
+ return status.Error(codes.Unavailable, "transient")
+ })
+
+ require.Error(t, bp.retrySend(context.Background(), mockStream,
&clusterv1.SendRequest{}, "n1"))
+ require.Equal(t, float64(1), pubErr.sum(sendErrReasonRetryExhausted))
+ require.Equal(t, float64(1), migErr.sum(sendErrReasonRetryExhausted),
"migration family must mirror the pub family")
+}
+
+// TestPublishMirrorsStartedFinishedToMigrationMetrics verifies the batch-write
+// success path increments started+finished on BOTH families.
+func TestPublishMirrorsStartedFinishedToMigrationMetrics(t *testing.T) {
+ pubStarted, pubFinished := &countingCounter{}, &countingCounter{}
+ migStarted, migFinished := &countingCounter{}, &countingCounter{}
+ pm := &pubMetrics{ //nolint:exhaustruct
+ totalStarted: pubStarted,
+ totalFinished: pubFinished,
+ totalLatency: &noopHistogram{},
+ totalErr: newErrReasonCapturer(),
+ sentBytes: &countingCounter{},
+ }
+ p := newPubWithConnMgrForMetrics(t, pm)
+ p.migrationMetrics = &pubMigrationMetrics{ //nolint:exhaustruct
+ totalStarted: migStarted,
+ totalFinished: migFinished,
+ totalLatency: &noopHistogram{},
+ totalErr: newErrReasonCapturer(),
+ sentBytes: &countingCounter{},
+ }
+
+ mockStream := NewMockSendClient(context.Background())
+ mockStream.SetSendFunc(func(*clusterv1.SendRequest) error { return nil
})
+ doneCh := make(chan struct{})
+ close(doneCh)
+
+ const nodeName = "node-a"
+ bp := p.NewBatchPublisher(10 * time.Second).(*batchPublisher)
+ bp.streams[nodeName] = writeStream{client: mockStream, ctxDoneCh:
doneCh}
+
+ _, publishErr := bp.Publish(context.Background(),
data.TopicMeasureWrite, bus.NewMessageWithNode(1, nodeName, []byte("payload")))
+ require.NoError(t, publishErr)
+
+ require.Equal(t, float64(1), pubStarted.count)
+ require.Equal(t, float64(1), migStarted.count, "migration started must
mirror pub started")
+ require.Equal(t, float64(1), pubFinished.count)
+ require.Equal(t, float64(1), migFinished.count, "migration finished
must mirror pub finished")
+}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 83be1e3b8..e7d656183 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -72,25 +72,26 @@ var (
type pub struct {
schema.UnimplementedOnInitHandler
- metadata metadata.Repo
- handlers map[bus.Topic]schema.EventHandler
- log *logger.Logger
- metrics *pubMetrics
- connMgr *grpchelper.ConnManager[*client]
- closer *run.Closer
- writableProbe map[string]map[string]struct{}
- nodeCache map[string]nodeInfo
- caCertPath string
- caCertReloader *pkgtls.Reloader
- prefix string
- retryPolicy string
- selfNode string
- selfRole string
- selfTier string
- allowedRoles []databasev1.Role
- writableProbeMu sync.Mutex
- nodeCacheMu sync.RWMutex
- tlsEnabled bool
+ metadata metadata.Repo
+ handlers map[bus.Topic]schema.EventHandler
+ log *logger.Logger
+ metrics *pubMetrics
+ migrationMetrics *pubMigrationMetrics
+ connMgr *grpchelper.ConnManager[*client]
+ closer *run.Closer
+ writableProbe map[string]map[string]struct{}
+ nodeCache map[string]nodeInfo
+ caCertPath string
+ caCertReloader *pkgtls.Reloader
+ prefix string
+ retryPolicy string
+ selfNode string
+ selfRole string
+ selfTier string
+ allowedRoles []databasev1.Role
+ writableProbeMu sync.Mutex
+ nodeCacheMu sync.RWMutex
+ tlsEnabled bool
}
// nodeInfo caches the resolved role and tier for a remote node.
@@ -455,7 +456,12 @@ func New(metadata metadata.Repo, roles ...databasev1.Role)
queue.Client {
// NewWithoutMetadata returns a new queue client without metadata, defaulting
to data nodes.
// sender_* fields are left empty for this lifecycle-tool publisher.
-func NewWithoutMetadata() queue.Client {
+//
+// If omr is non-nil, a parallel banyandb_lifecycle_migration_* metric family
is
+// registered on it. The regular banyandb_queue_pub_* family stays disabled
because
+// metadata is nil (PreRun gates it on metadata != nil). Pass nil to leave the
+// migration metrics disabled (e.g. tests, or non-migration clients).
+func NewWithoutMetadata(omr observability.MetricsRegistry) queue.Client {
p := New(nil, databasev1.Role_ROLE_DATA)
pp := p.(*pub)
pp.log = logger.GetLogger("queue-client")
@@ -465,6 +471,9 @@ func NewWithoutMetadata() queue.Client {
RetryPolicy: pp.retryPolicy,
MaxRecvMsgSize: maxReceiveMessageSize,
})
+ if omr != nil {
+ pp.migrationMetrics =
newPubMigrationMetrics(omr.With(lifecycleMigrationScope))
+ }
return p
}
@@ -662,18 +671,19 @@ func (p *pub) NewChunkedSyncClientWithConfig(node string,
config *ChunkedSyncCli
info = resolveNodeInfo(n)
}
return &chunkedSyncClient{
- client: c.client,
- conn: c.conn,
- node: node,
- log: p.log,
- metrics: p.metrics,
- selfNode: p.selfNode,
- selfRole: p.selfRole,
- selfTier: p.selfTier,
- remoteRole: info.role,
- remoteTier: info.tier,
- chunkSize: config.ChunkSize,
- config: config,
+ client: c.client,
+ conn: c.conn,
+ node: node,
+ log: p.log,
+ metrics: p.metrics,
+ migrationMetrics: p.migrationMetrics,
+ selfNode: p.selfNode,
+ selfRole: p.selfRole,
+ selfTier: p.selfTier,
+ remoteRole: info.role,
+ remoteTier: info.tier,
+ chunkSize: config.ChunkSize,
+ config: config,
}, nil
}
diff --git a/banyand/queue/pub/role_gate_test.go
b/banyand/queue/pub/role_gate_test.go
new file mode 100644
index 000000000..94ec77447
--- /dev/null
+++ b/banyand/queue/pub/role_gate_test.go
@@ -0,0 +1,45 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+ "testing"
+
+ "github.com/onsi/gomega"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+// TestRoleLessNodeRejectedByGate verifies the publisher's role gate drops a
node
+// whose roles do not intersect allowedRoles - before any dial. This is the
exact
+// failure mode the lifecycle native-metrics registration must avoid: it MUST
+// register the co-located data node with ROLE_DATA, otherwise OnAddOrUpdate
+// silently returns and the node never becomes active. The positive (ROLE_DATA
->
+// active) path is covered by the register/unregister specs in client_test.go.
+func TestRoleLessNodeRejectedByGate(t *testing.T) {
+ g := gomega.NewWithT(t)
+ p := newPub() // allowedRoles defaults to ROLE_DATA
+ defer p.GracefulStop()
+
+ node := getDataNode("lifecycle-local", "127.0.0.1:0")
+ node.Spec.(*databasev1.Node).Roles = nil
+ p.OnAddOrUpdate(node)
+
+ g.Expect(p.connMgr.ActiveCount()).To(gomega.Equal(0), "role-less node
must not become active")
+ g.Expect(p.connMgr.EvictableCount()).To(gomega.Equal(0), "role-less
node must not even be dialed")
+}
diff --git a/banyand/queue/test/chunked_sync_common.go
b/banyand/queue/test/chunked_sync_common.go
index 566f36e97..a1fecd94f 100644
--- a/banyand/queue/test/chunked_sync_common.go
+++ b/banyand/queue/test/chunked_sync_common.go
@@ -108,7 +108,7 @@ func setupChunkedSyncTestWithChunkSize(t *testing.T,
testName string, chunkSize
return errInternal == nil
}, flags.EventuallyTimeout, 100*time.Millisecond)
- client := pub.NewWithoutMetadata()
+ client := pub.NewWithoutMetadata(nil)
nodeAddr := fmt.Sprintf("localhost:%d", grpcPort)
nodeName := testName + "-node"
diff --git a/fodc/agent/internal/flightrecorder/datasource.go
b/fodc/agent/internal/flightrecorder/datasource.go
index 747b467b0..b6ee00559 100644
--- a/fodc/agent/internal/flightrecorder/datasource.go
+++ b/fodc/agent/internal/flightrecorder/datasource.go
@@ -256,6 +256,27 @@ func (ds *Datasource) GetTimestamps() *TimestampRingBuffer
{
return ds.timestamps
}
+// GetCurrentSnapshot returns the latest value of every metric together with
the
+// latest timestamp, all captured under a single lock. UpdateBatch mutates
under
+// the same lock, so the returned values and timestamp always belong to the
same
+// batch: callers never observe a mix of an old metric value and a new
timestamp
+// (or vice versa) while an UpdateBatch is in flight. The map is keyed by the
same
+// metric key as GetMetrics.
+func (ds *Datasource) GetCurrentSnapshot() (map[string]float64, int64) {
+ ds.mu.RLock()
+ defer ds.mu.RUnlock()
+
+ values := make(map[string]float64, len(ds.metrics))
+ for k, buffer := range ds.metrics {
+ values[k] = buffer.GetCurrentValue()
+ }
+ var timestamp int64
+ if ds.timestamps != nil {
+ timestamp = ds.timestamps.GetCurrentValue()
+ }
+ return values, timestamp
+}
+
// GetDescriptions returns a copy of the descriptions map.
func (ds *Datasource) GetDescriptions() map[string]string {
ds.mu.RLock()
diff --git a/fodc/agent/internal/flightrecorder/datasource_test.go
b/fodc/agent/internal/flightrecorder/datasource_test.go
index b4daf4417..c144ebd93 100644
--- a/fodc/agent/internal/flightrecorder/datasource_test.go
+++ b/fodc/agent/internal/flightrecorder/datasource_test.go
@@ -514,9 +514,10 @@ func
TestDatasource_UpdateBatch_ConcurrentReadsDuringUpdate(t *testing.T) {
// Wait for batch update to start
<-batchStart
- // Try to read during batch update
- metricsMap := ds.GetMetrics()
- timestamps := ds.GetTimestamps()
+ // Try to read during batch update. GetCurrentSnapshot
reads every
+ // value plus the timestamp under a single lock, so it
always returns a
+ // coherent view of one batch even while UpdateBatch is
in flight.
+ values, ts := ds.GetCurrentSnapshot()
readResults[idx] = struct {
metric1Value float64
@@ -524,10 +525,10 @@ func
TestDatasource_UpdateBatch_ConcurrentReadsDuringUpdate(t *testing.T) {
metric3Value float64
timestamp int64
}{
- metric1Value:
metricsMap["metric1"].GetCurrentValue(),
- metric2Value:
metricsMap["metric2"].GetCurrentValue(),
- metric3Value:
metricsMap["metric3"].GetCurrentValue(),
- timestamp: timestamps.GetCurrentValue(),
+ metric1Value: values["metric1"],
+ metric2Value: values["metric2"],
+ metric3Value: values["metric3"],
+ timestamp: ts,
}
readComplete <- struct{}{}
diff --git a/test/cases/lifecycle/lifecycle.go
b/test/cases/lifecycle/lifecycle.go
index 4ef7f2183..f145560de 100644
--- a/test/cases/lifecycle/lifecycle.go
+++ b/test/cases/lifecycle/lifecycle.go
@@ -25,6 +25,8 @@ import (
"fmt"
"io"
"io/fs"
+ "net/http"
+ "net/http/httptest"
"os"
"path/filepath"
"sort"
@@ -43,6 +45,7 @@ import (
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
"github.com/apache/skywalking-banyandb/banyand/backup/lifecycle"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
@@ -227,6 +230,25 @@ func verifyLifecycleStages(sc helpers.SharedContext,
verifyFn func(gomega.Gomega
})
}
+// verifyMigrationMetrics asserts the lifecycle tier-migration publisher
emitted
+// the banyandb_lifecycle_migration_* family (the mirror of
banyandb_queue_pub_*)
+// during the migration. The registry's Prometheus counters persist after the
+// command stops, so we scrape its handler directly rather than a live HTTP
port.
+func verifyMigrationMetrics(reg observability.MetricsRegistry) {
+ provider, ok := reg.(observability.PrometheusHandlerProvider)
+ gomega.Expect(ok).To(gomega.BeTrue(), "lifecycle metrics registry must
expose a Prometheus handler")
+ rec := httptest.NewRecorder()
+ provider.PrometheusHandler().ServeHTTP(rec,
httptest.NewRequest(http.MethodGet, "/metrics", nil))
+ gomega.Expect(rec.Code).To(gomega.Equal(http.StatusOK))
+ body := rec.Body.String()
+ // A successful migration send increments total_finished; the
measure/stream/trace
+ // part files are sent via the file-sync operation, so that label must
be present.
+
gomega.Expect(body).To(gomega.MatchRegexp(`banyandb_lifecycle_migration_total_finished\{[^}]*\}
[1-9]`),
+ "expected a non-zero
banyandb_lifecycle_migration_total_finished series, got:\n"+body)
+ gomega.Expect(body).To(gomega.ContainSubstring(`operation="file-sync"`),
+ "file-sync part migration must be metered in the
banyandb_lifecycle_migration_* family")
+}
+
func verifySourceDirectoriesAfterMigration() {
streamSrcPath := filepath.Join(SharedContext.SrcDir, "stream", "data",
"default")
streamEntries, err := os.ReadDir(streamSrcPath)
@@ -509,8 +531,10 @@ func crossSegmentTimestamps() (single, left, right
time.Time) {
// runLifecycleMigration runs a single hot->warm lifecycle migration, pointing
// every root path at the shared source dir and writing its report to
reportDir.
-func runLifecycleMigration(progressFile, reportDir string) {
- lifecycleCmd := lifecycle.NewCommand()
+// It returns the command's metrics registry so callers can verify the emitted
+// banyandb_lifecycle_migration_* family.
+func runLifecycleMigration(progressFile, reportDir string)
observability.MetricsRegistry {
+ lifecycleCmd, reg := lifecycle.NewCommandWithRegistry()
args := []string{
"--grpc-addr", SharedContext.DataAddr,
"--stream-root-path", SharedContext.SrcDir,
@@ -522,6 +546,7 @@ func runLifecycleMigration(progressFile, reportDir string) {
args = append(args, SharedContext.MetadataFlags...)
lifecycleCmd.SetArgs(args)
gomega.Expect(lifecycleCmd.Execute()).To(gomega.Succeed())
+ return reg
}
// drainWriteAcks closes a client-streaming write and fails the spec on any
@@ -737,7 +762,11 @@ var _ = ginkgo.Describe("Measure cross-segment migration",
ginkgo.Ordered, func(
gomega.Expect(err).NotTo(gomega.HaveOccurred())
defer os.RemoveAll(dir)
rf := filepath.Join(dir, "report")
- runLifecycleMigration(filepath.Join(dir, "progress.json"), rf)
+ migrationMetrics := runLifecycleMigration(filepath.Join(dir,
"progress.json"), rf)
+ // This Describe seeds sw_cross_segment immediately above and
migrates it
+ // here, so the tier-migration publisher always emits the
+ // banyandb_lifecycle_migration_* family regardless of spec
ordering.
+ verifyMigrationMetrics(migrationMetrics)
// === 4. Verify the destination directory contains multiple
seg-* folders for
// sw_cross_segment. Pre-fix, the cross-segment part
would copy entirely