This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/fodc-grafana-dashboard
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit bb3c6a8a9421b2ca28cf9702218401d85acac8bf
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed Jun 10 00:45:38 2026 +0000

    docs(fodc): align observability docs with lifecycle migration metrics
    
    Reconcile the FODC observability docs against the metric changes merged
    to main since this branch was created (#1164, #1166, #1167):
    
    - metrics.md: document the banyandb_lifecycle_migration_* family (the
      tier-migration mirror of queue_pub, served by the lifecycle sidecar at
      --lifecycle-http-port/:17915) and the lifecycle run-health series
      (cycles_total, last_run_timestamp_seconds, last_run_success); add
      remote_role="lifecycle" to queue_sub, drop the stale "no queue metrics"
      claim, and complete the publish-path error_type list (send_error,
      decode_error, invalid_topic).
    - topology.md + render_topology.py: weight the lifecycle migration layer
      from banyandb_lifecycle_migration_* (path still from /cluster/topology
      calls), exclude remote_role="lifecycle" sub series from the request
      pipeline fallback (else migration file-sync renders as a bogus data->data
      edge), reframe query/control as publisher-recorded by default, and fix the
      calls edge-kind count (three: migration, liaison route-table, gossip).
    - overview.md: add Key Signal #11 (lifecycle migration health) without
      renumbering the existing signals.
    - providers.md: note the lifecycle sidecar serves /metrics on :17915.
    
    Also restore banyand/queue/test/chunked_sync_common.go, dropped by accident
    in the previous commit, so banyand/queue/test compiles again.
---
 banyand/queue/test/chunked_sync_common.go | 539 ++++++++++++++++++++++++++++++
 docs/operation/fodc/render_topology.py    |  75 +++--
 docs/operation/fodc/topology.md           |  48 ++-
 docs/operation/observability/metrics.md   |  30 +-
 docs/operation/observability/overview.md  |   3 +-
 docs/operation/observability/providers.md |   2 +-
 6 files changed, 654 insertions(+), 43 deletions(-)

diff --git a/banyand/queue/test/chunked_sync_common.go 
b/banyand/queue/test/chunked_sync_common.go
new file mode 100644
index 000000000..a1fecd94f
--- /dev/null
+++ b/banyand/queue/test/chunked_sync_common.go
@@ -0,0 +1,539 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/spf13/cobra"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/banyand/queue/pub"
+       "github.com/apache/skywalking-banyandb/banyand/queue/sub"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+)
+
+// Setup holds the test environment configuration and components.
+type Setup struct {
+       Server        queue.Server
+       Client        queue.Client
+       Closer        run.Unit
+       ChunkedClient queue.ChunkedSyncClient
+       MockHandler   *MockChunkedSyncHandler
+       DeferFn       func()
+       NodeName      string
+       NodeAddr      string
+       TestGroup     run.Group
+       GRPCPort      uint32
+       HTTPPort      uint32
+}
+
+// setupChunkedSyncTest creates a complete test environment for chunked sync 
testing.
+func setupChunkedSyncTest(t *testing.T, testName string) *Setup {
+       return setupChunkedSyncTestWithChunkSize(t, testName, 1024)
+}
+
+// setupChunkedSyncTestWithChunkSize creates a complete test environment for 
chunked sync testing with custom chunk size.
+func setupChunkedSyncTestWithChunkSize(t *testing.T, testName string, 
chunkSize uint32) *Setup {
+       ports, err := test.AllocateFreePorts(2)
+       require.NoError(t, err, "Failed to allocate free ports")
+
+       grpcPort := uint32(ports[0])
+       httpPort := uint32(ports[1])
+
+       omr := observability.BypassRegistry
+
+       testGroup := run.NewGroup(testName)
+       closer, deferFn := run.NewTester(testName + "-closer")
+
+       server := sub.NewServerWithPorts(omr, testName, grpcPort, httpPort)
+
+       mockHandler := NewMockChunkedSyncHandler()
+
+       server.RegisterChunkedSyncHandler(data.TopicStreamPartSync, mockHandler)
+
+       testGroup.Register(closer, server)
+
+       cmd := &cobra.Command{
+               Use: testName,
+               FParseErrWhitelist: cobra.FParseErrWhitelist{
+                       UnknownFlags: true, // Ignore unknown flags
+               },
+               Run: func(_ *cobra.Command, _ []string) {
+                       err = testGroup.Run(context.Background())
+                       if err != nil {
+                               t.Logf("Server group failed: %v", err)
+                       }
+               },
+       }
+       cmd.Flags().AddFlagSet(testGroup.RegisterFlags().FlagSet)
+       go func() {
+               require.NoError(t, cmd.Execute())
+       }()
+
+       assert.Eventually(t, func() bool {
+               errInternal := helpers.HealthCheck(fmt.Sprintf("localhost:%d", 
grpcPort), 10*time.Second, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()))()
+               return errInternal == nil
+       }, flags.EventuallyTimeout, 100*time.Millisecond)
+
+       client := pub.NewWithoutMetadata(nil)
+
+       nodeAddr := fmt.Sprintf("localhost:%d", grpcPort)
+       nodeName := testName + "-node"
+       node := schema.Metadata{
+               TypeMeta: schema.TypeMeta{
+                       Name: nodeName,
+                       Kind: schema.KindNode,
+               },
+               Spec: &databasev1.Node{
+                       Metadata: &commonv1.Metadata{
+                               Name: nodeName,
+                       },
+                       Roles:       
[]databasev1.Role{databasev1.Role_ROLE_DATA},
+                       GrpcAddress: nodeAddr,
+               },
+       }
+
+       client.OnAddOrUpdate(node)
+
+       var chunkedClient queue.ChunkedSyncClient
+       assert.Eventually(t, func() bool {
+               var errInternal error
+               chunkedClient, errInternal = 
client.NewChunkedSyncClient(nodeName, chunkSize)
+               if err != nil {
+                       t.Logf("NewChunkedSyncClient error: %v", err)
+               }
+               return errInternal == nil
+       }, flags.EventuallyTimeout, 100*time.Millisecond)
+
+       return &Setup{
+               Server:        server,
+               Client:        client,
+               MockHandler:   mockHandler,
+               TestGroup:     testGroup,
+               Closer:        closer,
+               DeferFn:       deferFn,
+               GRPCPort:      grpcPort,
+               HTTPPort:      httpPort,
+               NodeName:      nodeName,
+               NodeAddr:      nodeAddr,
+               ChunkedClient: chunkedClient,
+       }
+}
+
+// cleanupTestSetup cleans up the test environment.
+func cleanupTestSetup(setup *Setup) {
+       if setup.ChunkedClient != nil {
+               setup.ChunkedClient.Close()
+       }
+       if setup.DeferFn != nil {
+               setup.DeferFn()
+       }
+}
+
+// createTestDataPart creates a StreamingPartData for testing.
+func createTestDataPart(files map[string][]byte, group string, shardID uint32) 
queue.StreamingPartData {
+       var fileInfos []queue.FileInfo
+       var totalSize uint64
+
+       for fileName, content := range files {
+               var buf bytes.Buffer
+               if _, err := buf.Write(content); err != nil {
+                       panic(fmt.Sprintf("failed to write content to buffer: 
%v", err))
+               }
+               fileInfos = append(fileInfos, queue.FileInfo{
+                       Name:   fileName,
+                       Reader: buf.SequentialRead(),
+               })
+               totalSize += uint64(len(content))
+       }
+
+       return queue.StreamingPartData{
+               ID:                    1,
+               Files:                 fileInfos,
+               Group:                 group,
+               ShardID:               shardID,
+               Topic:                 data.TopicStreamPartSync.String(),
+               CompressedSizeBytes:   totalSize,
+               UncompressedSizeBytes: totalSize,
+               TotalCount:            uint64(len(files)),
+               BlocksCount:           1,
+               MinTimestamp:          time.Now().UnixMilli(),
+               MaxTimestamp:          time.Now().UnixMilli(),
+       }
+}
+
+// createTestDataPartWithMetadata creates a StreamingPartData for testing with 
specific metadata values.
+func createTestDataPartWithMetadata(files map[string][]byte, group string, 
shardID uint32,
+       compressedSize, uncompressedSize, totalCount, blocksCount uint64,
+       minTimestamp, maxTimestamp int64, partID uint64,
+) queue.StreamingPartData {
+       var fileInfos []queue.FileInfo
+
+       for fileName, content := range files {
+               var buf bytes.Buffer
+               if _, err := buf.Write(content); err != nil {
+                       panic(fmt.Sprintf("failed to write content to buffer: 
%v", err))
+               }
+               fileInfos = append(fileInfos, queue.FileInfo{
+                       Name:   fileName,
+                       Reader: buf.SequentialRead(),
+               })
+       }
+
+       return queue.StreamingPartData{
+               ID:                    partID,
+               Files:                 fileInfos,
+               Group:                 group,
+               ShardID:               shardID,
+               Topic:                 data.TopicStreamPartSync.String(),
+               CompressedSizeBytes:   compressedSize,
+               UncompressedSizeBytes: uncompressedSize,
+               TotalCount:            totalCount,
+               BlocksCount:           blocksCount,
+               MinTimestamp:          minTimestamp,
+               MaxTimestamp:          maxTimestamp,
+       }
+}
+
+// MockPartHandler implements queue.PartHandler for testing.
+type MockPartHandler struct {
+       receivedFiles    map[string][]byte
+       syncHandler      *MockChunkedSyncHandler
+       trackingHandler  *MockChunkedSyncHandlerWithTracking
+       receivedChunks   [][]byte
+       finishSyncCalled bool
+       closeCalled      bool
+}
+
+// NewPartType marks the part as new and tracks the call.
+func (m *MockPartHandler) NewPartType(ctx *queue.ChunkedSyncPartContext) error 
{
+       if m.trackingHandler != nil {
+               m.trackingHandler.TrackNewPartType(ctx)
+       }
+       return nil
+}
+
+// FinishSync marks the sync as finished and closes the handler.
+func (m *MockPartHandler) FinishSync() error {
+       m.finishSyncCalled = true
+
+       if m.syncHandler != nil {
+               m.syncHandler.mu.Lock()
+               m.syncHandler.completedParts = 
append(m.syncHandler.completedParts, m)
+               m.syncHandler.mu.Unlock()
+       }
+       return m.Close()
+}
+
+// Close marks the handler as closed.
+func (m *MockPartHandler) Close() error {
+       m.closeCalled = true
+       return nil
+}
+
+// GetReceivedFiles returns a copy of the received files.
+func (m *MockPartHandler) GetReceivedFiles() map[string][]byte {
+       result := make(map[string][]byte)
+       for k, v := range m.receivedFiles {
+               result[k] = make([]byte, len(v))
+               copy(result[k], v)
+       }
+       return result
+}
+
+// GetReceivedChunksCount returns the number of received chunks.
+func (m *MockPartHandler) GetReceivedChunksCount() int {
+       return len(m.receivedChunks)
+}
+
+// MockChunkedSyncHandler implements queue.ChunkedSyncHandler for testing.
+type MockChunkedSyncHandler struct {
+       completedParts   []*MockPartHandler
+       receivedContexts []*queue.ChunkedSyncPartContext
+       mu               sync.Mutex
+}
+
+// NewMockChunkedSyncHandler creates a new mock chunked sync handler.
+func NewMockChunkedSyncHandler() *MockChunkedSyncHandler {
+       return &MockChunkedSyncHandler{
+               completedParts:   make([]*MockPartHandler, 0),
+               receivedContexts: make([]*queue.ChunkedSyncPartContext, 0),
+       }
+}
+
+// HandleFileChunk processes incoming file chunks for testing.
+func (m *MockChunkedSyncHandler) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext, chunk []byte) error {
+       currentPart := ctx.Handler.(*MockPartHandler)
+
+       if currentPart != nil {
+               chunkCopy := make([]byte, len(chunk))
+               copy(chunkCopy, chunk)
+               currentPart.receivedChunks = append(currentPart.receivedChunks, 
chunkCopy)
+
+               if currentPart.receivedFiles[ctx.FileName] == nil {
+                       currentPart.receivedFiles[ctx.FileName] = make([]byte, 
0)
+               }
+               currentPart.receivedFiles[ctx.FileName] = 
append(currentPart.receivedFiles[ctx.FileName], chunkCopy...)
+       }
+
+       return nil
+}
+
+// CreatePartHandler creates a new part handler for testing.
+func (m *MockChunkedSyncHandler) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+       m.mu.Lock()
+       contextCopy := &queue.ChunkedSyncPartContext{
+               ID:                    ctx.ID,
+               Group:                 ctx.Group,
+               ShardID:               ctx.ShardID,
+               PartType:              ctx.PartType,
+               CompressedSizeBytes:   ctx.CompressedSizeBytes,
+               UncompressedSizeBytes: ctx.UncompressedSizeBytes,
+               TotalCount:            ctx.TotalCount,
+               BlocksCount:           ctx.BlocksCount,
+               MinTimestamp:          ctx.MinTimestamp,
+               MaxTimestamp:          ctx.MaxTimestamp,
+               MinKey:                ctx.MinKey,
+               MaxKey:                ctx.MaxKey,
+       }
+       m.receivedContexts = append(m.receivedContexts, contextCopy)
+       m.mu.Unlock()
+
+       partHandler := &MockPartHandler{
+               receivedFiles:  make(map[string][]byte),
+               syncHandler:    m,
+               receivedChunks: make([][]byte, 0),
+       }
+       contextCopy.Handler = partHandler
+       return partHandler, nil
+}
+
+// GetCompletedParts returns all completed parts.
+func (m *MockChunkedSyncHandler) GetCompletedParts() []*MockPartHandler {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       result := make([]*MockPartHandler, len(m.completedParts))
+       copy(result, m.completedParts)
+       return result
+}
+
+// GetReceivedFiles returns all received files across all parts.
+func (m *MockChunkedSyncHandler) GetReceivedFiles() map[string][]byte {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       result := make(map[string][]byte)
+       for _, part := range m.completedParts {
+               for fileName, content := range part.GetReceivedFiles() {
+                       result[fileName] = content
+               }
+       }
+       return result
+}
+
+// GetReceivedChunksCount returns the total number of received chunks.
+func (m *MockChunkedSyncHandler) GetReceivedChunksCount() int {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       total := 0
+       for _, part := range m.completedParts {
+               total += part.GetReceivedChunksCount()
+       }
+       return total
+}
+
+// GetReceivedContexts returns all received contexts.
+func (m *MockChunkedSyncHandler) GetReceivedContexts() 
[]*queue.ChunkedSyncPartContext {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       result := make([]*queue.ChunkedSyncPartContext, len(m.receivedContexts))
+       for i, ctx := range m.receivedContexts {
+               contextCopy := &queue.ChunkedSyncPartContext{
+                       ID:                    ctx.ID,
+                       Group:                 ctx.Group,
+                       ShardID:               ctx.ShardID,
+                       FileName:              ctx.FileName,
+                       CompressedSizeBytes:   ctx.CompressedSizeBytes,
+                       UncompressedSizeBytes: ctx.UncompressedSizeBytes,
+                       TotalCount:            ctx.TotalCount,
+                       BlocksCount:           ctx.BlocksCount,
+                       MinTimestamp:          ctx.MinTimestamp,
+                       MaxTimestamp:          ctx.MaxTimestamp,
+                       MinKey:                ctx.MinKey,
+                       MaxKey:                ctx.MaxKey,
+               }
+               result[i] = contextCopy
+       }
+       return result
+}
+
+// MockChunkedSyncHandlerWithTracking extends MockChunkedSyncHandler with 
NewPartType call tracking.
+type MockChunkedSyncHandlerWithTracking struct {
+       *MockChunkedSyncHandler
+       newPartTypeContexts []*queue.ChunkedSyncPartContext
+       trackingMu          sync.Mutex
+       newPartTypeCalled   bool
+}
+
+// NewMockChunkedSyncHandlerWithTracking creates a new mock handler with 
tracking capabilities.
+func NewMockChunkedSyncHandlerWithTracking() 
*MockChunkedSyncHandlerWithTracking {
+       return &MockChunkedSyncHandlerWithTracking{
+               MockChunkedSyncHandler: NewMockChunkedSyncHandler(),
+               newPartTypeContexts:    make([]*queue.ChunkedSyncPartContext, 
0),
+       }
+}
+
+// TrackNewPartType tracks a NewPartType call.
+func (m *MockChunkedSyncHandlerWithTracking) TrackNewPartType(ctx 
*queue.ChunkedSyncPartContext) {
+       m.trackingMu.Lock()
+       defer m.trackingMu.Unlock()
+       m.newPartTypeCalled = true
+       contextCopy := &queue.ChunkedSyncPartContext{
+               ID:                    ctx.ID,
+               Group:                 ctx.Group,
+               ShardID:               ctx.ShardID,
+               PartType:              ctx.PartType,
+               CompressedSizeBytes:   ctx.CompressedSizeBytes,
+               UncompressedSizeBytes: ctx.UncompressedSizeBytes,
+               TotalCount:            ctx.TotalCount,
+               BlocksCount:           ctx.BlocksCount,
+               MinTimestamp:          ctx.MinTimestamp,
+               MaxTimestamp:          ctx.MaxTimestamp,
+               MinKey:                ctx.MinKey,
+               MaxKey:                ctx.MaxKey,
+       }
+       m.newPartTypeContexts = append(m.newPartTypeContexts, contextCopy)
+}
+
+// NewPartTypeCalled returns whether NewPartType has been called.
+func (m *MockChunkedSyncHandlerWithTracking) NewPartTypeCalled() bool {
+       m.trackingMu.Lock()
+       defer m.trackingMu.Unlock()
+       return m.newPartTypeCalled
+}
+
+// GetNewPartTypeContexts returns all contexts passed to NewPartType.
+func (m *MockChunkedSyncHandlerWithTracking) GetNewPartTypeContexts() 
[]*queue.ChunkedSyncPartContext {
+       m.trackingMu.Lock()
+       defer m.trackingMu.Unlock()
+       result := make([]*queue.ChunkedSyncPartContext, 
len(m.newPartTypeContexts))
+       copy(result, m.newPartTypeContexts)
+       return result
+}
+
+// ResetNewPartTypeCalls resets the tracking state.
+func (m *MockChunkedSyncHandlerWithTracking) ResetNewPartTypeCalls() {
+       m.trackingMu.Lock()
+       defer m.trackingMu.Unlock()
+       m.newPartTypeCalled = false
+       m.newPartTypeContexts = nil
+}
+
+// CreatePartHandler creates a new part handler with tracking capabilities.
+func (m *MockChunkedSyncHandlerWithTracking) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+       partHandler, err := m.MockChunkedSyncHandler.CreatePartHandler(ctx)
+       if err != nil {
+               return nil, err
+       }
+
+       // Cast to our enhanced handler and set tracking reference
+       if mockHandler, ok := partHandler.(*MockPartHandler); ok {
+               mockHandler.trackingHandler = m
+       }
+
+       return partHandler, nil
+}
+
+// createTestDataPartWithPartType creates test data part with specified 
PartType.
+func createTestDataPartWithPartType(
+       files map[string][]byte,
+       group string,
+       shardID uint32,
+       partID uint64,
+       partType string,
+) queue.StreamingPartData {
+       var fileInfos []queue.FileInfo
+
+       for fileName, content := range files {
+               var buf bytes.Buffer
+               if _, err := buf.Write(content); err != nil {
+                       panic(fmt.Sprintf("failed to write content to buffer: 
%v", err))
+               }
+               fileInfos = append(fileInfos, queue.FileInfo{
+                       Name:   fileName,
+                       Reader: buf.SequentialRead(),
+               })
+       }
+
+       return queue.StreamingPartData{
+               ID:                    partID,
+               Files:                 fileInfos,
+               Group:                 group,
+               ShardID:               shardID,
+               PartType:              partType,
+               Topic:                 data.TopicStreamPartSync.String(),
+               CompressedSizeBytes:   1024,
+               UncompressedSizeBytes: 2048,
+               TotalCount:            100,
+               BlocksCount:           5,
+               MinTimestamp:          time.Now().UnixMilli(),
+               MaxTimestamp:          time.Now().UnixMilli(),
+       }
+}
+
+// TrackingSetup extends Setup with tracking capabilities.
+type TrackingSetup struct {
+       *Setup
+       TrackingHandler *MockChunkedSyncHandlerWithTracking
+}
+
+// setupChunkedSyncTestWithTracking creates a test environment with tracking 
capabilities.
+func setupChunkedSyncTestWithTracking(t *testing.T, testName string) 
*TrackingSetup {
+       setup := setupChunkedSyncTest(t, testName)
+
+       trackingHandler := NewMockChunkedSyncHandlerWithTracking()
+
+       // Replace the handler in the server
+       setup.Server.RegisterChunkedSyncHandler(data.TopicStreamPartSync, 
trackingHandler)
+
+       // Update the MockHandler reference in the setup to point to our 
tracking handler
+       setup.MockHandler = trackingHandler.MockChunkedSyncHandler
+
+       return &TrackingSetup{
+               Setup:           setup,
+               TrackingHandler: trackingHandler,
+       }
+}
diff --git a/docs/operation/fodc/render_topology.py 
b/docs/operation/fodc/render_topology.py
index cc31f5ab3..1f453e4b2 100755
--- a/docs/operation/fodc/render_topology.py
+++ b/docs/operation/fodc/render_topology.py
@@ -18,10 +18,13 @@
 
 """Render a BanyanDB cluster topology by joining the FODC proxy's
 /cluster/topology (node inventory) with queue Prometheus metrics (live edges).
-Edges come from the publisher metrics (queue_pub) by default; for edges the
-publisher does not record -- chiefly liaison->warm/cold query fan-out -- the
-subscriber metrics (queue_sub) fill in, read from the receiver's side and
-flipped back to the true sender->receiver direction.
+Request edges come from the publisher metrics (queue_pub) by default; for
+edges the publisher does not record -- chiefly liaison->warm/cold query
+fan-out on older servers -- the subscriber metrics (queue_sub) fill in, read
+from the receiver's side and flipped back to the true sender->receiver
+direction. Lifecycle tier-migration edges take their path from the topology
+`calls` and their weight from the banyandb_lifecycle_migration_* family (the
+tier-migration mirror of queue_pub) whenever a migration has run.
 Output is Graphviz DOT and/or Mermaid. Stdlib only.
 If Prometheus is behind Grafana's datasource proxy with basic auth,
 set PROM_USER / PROM_PASS in the environment."""
@@ -72,7 +75,7 @@ def main():
 
     # 1) Node inventory from /cluster/topology.
     topo = http_get_json(args.proxy.rstrip("/") + "/cluster/topology")
-    nodes, podname2name = {}, {}
+    nodes, podname2name, podname2lifecycle = {}, {}, {}
     for n in topo.get("nodes", []):
         name = (n.get("metadata") or {}).get("name", "")
         if not name:
@@ -94,6 +97,14 @@ def main():
         host = name.split(".")[0].split(":")[0]
         if host and not is_lifecycle and host not in podname2name:
             podname2name[host] = name
+        # The banyandb_lifecycle_migration_* series are scraped from the 
lifecycle
+        # sidecar (container_name="lifecycle") but share the data pod's 
pod_name,
+        # so they resolve through a separate map to the pod's :17914 identity.
+        if is_lifecycle:
+            if labels.get("pod_name"):
+                podname2lifecycle.setdefault(labels["pod_name"], name)
+            if host:
+                podname2lifecycle.setdefault(host, name)
 
     def local_name(pod):
         return podname2name.get(pod, pod)
@@ -117,15 +128,21 @@ def main():
     #    flow is pod_name -> remote_node. The subscriber (queue_sub) scrape
     #    target is the receiver and remote_node is the sender, so its flow is
     #    remote_node -> pod_name -- the edge is flipped. The sub view is what
-    #    surfaces liaison->warm/cold query fan-out, which the publisher side
-    #    does not record on servers whose publish path is not yet instrumented.
-    def build_edges(thr, p99, err, byt, invert):
+    #    surfaces liaison->warm/cold query fan-out on older servers whose
+    #    publish path predates the query/control instrumentation. Sub series
+    #    with remote_role="lifecycle" are inbound tier-migration traffic, not
+    #    request pipeline, and are skipped (the migration layer is weighted
+    #    from the publisher-side banyandb_lifecycle_migration_* family).
+    def build_edges(thr, p99, err, byt, invert, resolve_local=None, 
skip_remote_role=None):
+        resolve = resolve_local or local_name
         es = {}
         for s in thr:
             m = s["metric"]
-            pod_node, peer = local_name(m.get("pod_name", "")), 
m.get("remote_node", "")
+            pod_node, peer = resolve(m.get("pod_name", "")), 
m.get("remote_node", "")
             if not pod_node or not peer:
                 continue
+            if skip_remote_role and m.get("remote_role", "") == 
skip_remote_role:
+                continue
             pod_role, pod_tier = primary_role([m.get("node_role", "")]), 
m.get("node_type", "")
             peer_role, peer_tier = m.get("remote_role", ""), 
m.get("remote_tier", "")
             if invert:
@@ -142,7 +159,7 @@ def main():
 
         def add_scalar(rows, field):
             for s in rows:
-                pod_node = local_name(s["metric"].get("pod_name", ""))
+                pod_node = resolve(s["metric"].get("pod_name", ""))
                 peer = s["metric"].get("remote_node", "")
                 key = (peer, pod_node) if invert else (pod_node, peer)
                 if key in es:
@@ -169,23 +186,37 @@ def main():
         p99_by("banyandb_queue_sub_total_latency_bucket"),
         rate_by("banyandb_queue_sub_total_err", "pod_name, remote_node"),
         None,  # sub records received_bytes, not sent_bytes; query/control 
carry no bytes
-        invert=True)
+        invert=True, skip_remote_role="lifecycle")
     edges = dict(pub_edges)
     for key, sub_edge in sub_edges.items():
         edges.setdefault(key, sub_edge)  # default pub; fall back to sub for 
edges pub lacks
 
-    # 3) Lifecycle migration edges from /cluster/topology `calls`: keep only 
edges
-    #    whose source is a lifecycle node (name ends with the lifecycle port) 
and
-    #    drop the data<->data property-gossip mesh. These are structural (the
-    #    lifecycle publisher is metadata-less, so it emits no queue metrics).
+    # 3) Lifecycle migration edges. The PATH comes from /cluster/topology 
`calls`
+    #    (structural; present even between scheduled runs): keep only edges 
whose
+    #    source is a lifecycle node (name ends with the lifecycle port), drop 
the
+    #    liaison route-table copies and the data<->data property-gossip mesh. 
The
+    #    WEIGHT comes from the banyandb_lifecycle_migration_* family -- the
+    #    tier-migration mirror of queue_pub, scraped from the lifecycle sidecar
+    #    (container_name="lifecycle"), whose pod_name resolves to the pod's
+    #    :17914 lifecycle identity rather than its data node.
     lc_suffix = ":" + str(args.lifecycle_port)
-    migrations = []
+    migrations = {}
     for c in topo.get("calls", []):
         src, dst = c.get("source", ""), c.get("target", "")
         if src.endswith(lc_suffix) and dst:
             ensure_node(src, role="lifecycle")
             ensure_node(dst)
-            migrations.append((src, dst))
+            migrations.setdefault((src, dst), None)
+    mig_weights = build_edges(
+        rate_by("banyandb_lifecycle_migration_total_finished", edge_by),
+        p99_by("banyandb_lifecycle_migration_total_latency_bucket"),
+        rate_by("banyandb_lifecycle_migration_total_err", "pod_name, 
remote_node"),
+        rate_by("banyandb_lifecycle_migration_sent_bytes", "pod_name, 
remote_node"),
+        invert=False, resolve_local=lambda pod: podname2lifecycle.get(pod, 
pod))
+    for key, e in mig_weights.items():
+        ensure_node(key[0], role="lifecycle")
+        ensure_node(key[1])
+        migrations[key] = e  # weighted when a migration has run; else 
structural
 
     if args.format in ("dot", "both"):
         print(render_dot(nodes, edges, migrations, args.p99_warn))
@@ -216,8 +247,9 @@ def render_dot(nodes, edges, migrations, p99_warn):
         pw = 1.0 + min(4.0, sum(e["ops"].values()) ** 0.25)
         out.append('  "%s" -> "%s" [label="%s", color="%s", penwidth=%.1f, 
fontsize=10];'
                    % (local, remote, edge_label(e), "#c62828" if red else 
"#607d8b", pw))
-    for src, dst in sorted(migrations):  # lifecycle migration (structural)
-        out.append('  "%s" -> "%s" [label="migrate", style=dashed, 
color="#8e24aa", fontsize=9];' % (src, dst))
+    for (src, dst), e in sorted(migrations.items()):  # lifecycle migration 
(dashed)
+        label = "migrate" if e is None else "migrate\\n" + edge_label(e)
+        out.append('  "%s" -> "%s" [label="%s", style=dashed, color="#8e24aa", 
fontsize=9];' % (src, dst, label))
     out.append("}")
     return "\n".join(out)
 
@@ -235,8 +267,9 @@ def render_mermaid(nodes, edges, migrations, p99_warn):
         if e["err"] > 0 or e["p99"] > p99_warn:
             red_idx.append(i)
         i += 1
-    for src, dst in sorted(migrations):  # lifecycle migration (structural, 
dashed)
-        out.append('  %s -. "migrate" .-> %s' % (nid(src), nid(dst)))
+    for (src, dst), e in sorted(migrations.items()):  # lifecycle migration 
(dashed)
+        label = "migrate" if e is None else "migrate · " + 
edge_label(e).replace("\\n", " · ")
+        out.append('  %s -. "%s" .-> %s' % (nid(src), label, nid(dst)))
     for name, a in sorted(nodes.items()):
         out.append("  style %s fill:%s" % (nid(name), 
TIER_COLOR.get(a["tier"], "#cfd8dc")))
     for idx in red_idx:
diff --git a/docs/operation/fodc/topology.md b/docs/operation/fodc/topology.md
index c5132043a..b96589ebd 100644
--- a/docs/operation/fodc/topology.md
+++ b/docs/operation/fodc/topology.md
@@ -2,13 +2,13 @@
 
 BanyanDB gives you two partial views of a running cluster, and neither is a 
usable topology on its own:
 
-- The **FODC proxy `/cluster/topology`** endpoint knows the **nodes** — names, 
roles, storage tier, health — and a `calls` graph that carries the **lifecycle 
tier-migration** edges (hot→warm→cold) mixed with a noisy data↔data 
property-repair gossip mesh.
-- The **queue metrics** (`banyandb_queue_pub_*` / `banyandb_queue_sub_*`) know 
the **request pipeline edges** — which node sends what to which, for what 
operation/group, at what throughput, latency, and error rate — but carry only 
the peer's node name and the local scrape target's identity, with no node 
inventory or health.
+- The **FODC proxy `/cluster/topology`** endpoint knows the **nodes** — names, 
roles, storage tier, health — and a `calls` graph that carries the **lifecycle 
tier-migration** edges (hot→warm→cold) mixed with the liaison's route-table 
edges and a noisy data↔data property-repair gossip mesh.
+- The **queue metrics** (`banyandb_queue_pub_*` / `banyandb_queue_sub_*`, plus 
the tier-migration mirror `banyandb_lifecycle_migration_*`) know the **flow 
edges** — which node sends what to which, for what operation/group, at what 
throughput, latency, and error rate — but carry only the peer's node name and 
the local scrape target's identity, with no node inventory or health.
 
 This page shows how to **join them on node name** into one directed topology 
with **two edge layers**:
 
-- **Request pipeline** (solid, weighted) — liaison→data, from the queue 
metrics: `batch-write`, `file-sync`, `query`, and `control`, with throughput, 
p99 latency, errors, and file-sync bytes. The publisher metrics (`queue_pub`) 
are the primary source; the subscriber metrics (`queue_sub`) fill any edge the 
publisher does not record — chiefly the liaison→warm/cold `query` fan-out on 
servers whose publish path is not yet instrumented for it.
-- **Lifecycle migration** (dashed, structural) — hot→warm→cold tier movement, 
from the **lifecycle** service's entries in `/cluster/topology` `calls`.
+- **Request pipeline** (solid, weighted) — liaison→data, from the queue 
metrics: `batch-write`, `file-sync`, `query`, and `control`, with throughput, 
p99 latency, errors, and file-sync bytes. The publisher metrics (`queue_pub`) 
are the primary source and record all four operations on current servers; the 
subscriber metrics (`queue_sub`) fill any edge the publisher does not record — 
chiefly the liaison→warm/cold `query` fan-out on older servers whose publish 
path predates the `query`/`con [...]
+- **Lifecycle migration** (dashed) — hot→warm→cold tier movement. The **path** 
comes from the **lifecycle** service's entries in `/cluster/topology` `calls` 
(structural — present even between scheduled runs); the **weight** comes from 
the `banyandb_lifecycle_migration_*` family, the tier-migration mirror of 
`queue_pub`.
 
 Nodes (and their health) come from `/cluster/topology`. The result answers 
"who sends what to whom, how fast, and is it healthy?" — useful for dashboards 
and incident triage.
 
@@ -34,7 +34,7 @@ Returns `{ "nodes": [...], "calls": [...] }`. Each node 
carries the identity and
 - `labels.type` is the storage tier (`hot` / `warm` / `cold`; empty for 
liaison).
 - `labels.container_name` is the k8s container the node runs as (`liaison` / 
`data` / `lifecycle`), stamped by the agent from its `--container-names` config 
— so even the role-less lifecycle sidecar is classified correctly.
 - `labels.pod_name`, `status`, and `last_heartbeat` are enrichment the 
proxy/agent fill in for every node matched to a live agent — see 
[Caveats](#caveats).
-- `calls` mixes two things: the **lifecycle service's** tier-migration targets 
(hot→warm→cold — we render these, see [Lifecycle migration 
layer](#lifecycle-migration-layer)) and a data↔data property-repair gossip mesh 
(we drop these as noise).
+- `calls` mixes three things: the **lifecycle service's** tier-migration 
targets (hot→warm→cold — we render these, see [Lifecycle migration 
layer](#lifecycle-migration-layer)), the **liaison's** route-table edges 
(dropped — the request layer is rendered from the weighted queue metrics 
instead), and a data↔data property-repair gossip mesh (dropped as noise).
 
 ### Edges — queue metrics on the FODC proxy scrape
 
@@ -48,14 +48,16 @@ Every `banyandb_queue_pub_*` series carries **both 
endpoints** of one directed f
 
 `remote_node` is exactly a topology `metadata.name`, so a publisher series is 
a ready-made directed edge **`pod_name` → `remote_node`**.
 
-`banyandb_queue_sub_*` on the receiver carries the **same five labels**, but 
recorded from the other end: `pod_name` is the **receiver** and `remote_node` 
is the **sender** (the publisher stamps its identity onto the first frame of 
each stream, and the receiver reads it back). So a subscriber series is the 
directed edge **`remote_node` → `pod_name`** — the inverse mapping. We use the 
publisher as the primary source and the subscriber **only to fill edges the 
publisher does not record**,  [...]
+`banyandb_queue_sub_*` on the receiver carries the **same five labels**, but 
recorded from the other end: `pod_name` is the **receiver** and `remote_node` 
is the **sender** (the publisher stamps its identity onto the first frame of 
each stream, and the receiver reads it back). So a subscriber series is the 
directed edge **`remote_node` → `pod_name`** — the inverse mapping. We use the 
publisher as the primary source and the subscriber **only to fill edges the 
publisher does not record**,  [...]
+
+One class of subscriber series belongs to a different layer entirely: series 
with **`remote_role="lifecycle"`** are inbound **tier-migration** traffic from 
the lifecycle service (which stamps its co-located data node's name and tier as 
its identity). Keep them **out of the request-pipeline fallback** — otherwise 
migration file-sync shows up as a bogus data→data request edge — and use them, 
if at all, only as a receiver-side cross-check of the migration layer below.
 
 ## The join
 
 1. **Remote endpoint** — `remote_node` **==** topology `metadata.name`. 
Direct, exact.
 2. **Local endpoint** — resolve the metric's `pod_name` to a topology node via 
`labels.pod_name` when present; otherwise keep the `pod_name` as the node id 
and take its role/tier from the metric's own `node_role` / `node_type`. Both 
liaisons (publishers) and data nodes (subscribers) carry `labels.pod_name`, so 
both ends resolve cleanly. **Lifecycle sidecars share a pod — and thus a 
`pod_name` — with their co-located data node**, so they are excluded from the 
`pod_name` → node map; otherw [...]
 3. **Direction follows the recording side.** A publisher series (`queue_pub`) 
is `local → remote_node` (the scrape target is the sender). A subscriber series 
(`queue_sub`) is the **inverse** — `remote_node → local` (the scrape target is 
the receiver; `remote_node` is the sender). After flipping the subscriber edges 
they share the same node-name keyspace as the publisher edges.
-4. **Publisher first, subscriber as fallback — per edge.** Build the edge set 
from `queue_pub`; then add an edge from `queue_sub` **only if the publisher did 
not already record it**. Never sum both sides for one edge. On servers that 
don't yet record `query`/`control` on the publish path, this is what surfaces 
the liaison→warm/cold `query` edges; once the publisher records them too, those 
edges come from `queue_pub` and the subscriber contributes nothing for them (so 
there is still no do [...]
+4. **Publisher first, subscriber as fallback — per edge.** Build the edge set 
from `queue_pub`; then add an edge from `queue_sub` **only if the publisher did 
not already record it**, and **skip subscriber series with 
`remote_role="lifecycle"`** (migration traffic — it belongs to the migration 
layer, not the request pipeline). Never sum both sides for one edge. On older 
servers that don't record `query`/`control` on the publish path, the fallback 
is what surfaces the liaison→warm/cold `qu [...]
 5. **Node attributes come from `/cluster/topology`** (authoritative, 
consistent), falling back to the metric's `node_*` / `remote_*` labels for 
endpoints the topology did not enrich.
 
 The node set is the **union** of topology nodes and the endpoints seen in the 
metrics, so idle nodes (no current traffic) still appear, and a live edge to a 
node missing from topology still renders.
@@ -98,7 +100,20 @@ sum by (pod_name, remote_node)
 
 Add `, group` to any `by (...)` clause to break an edge down by business group.
 
-Run the **same four queries** against `banyandb_queue_sub_*` to get the 
subscriber view (e.g. 
`rate(banyandb_queue_sub_total_finished{...}[$__rate_interval])`). Remember the 
subscriber edge is inverted — `remote_node` is the **source** and `pod_name` is 
the **target** — and that `banyandb_queue_sub_*` records `received_bytes`, not 
`sent_bytes`. Merge the two sets per edge, preferring the publisher, so the 
liaison→warm/cold `query` edges fill in from the subscriber while every 
publisher e [...]
+Run the **same four queries** against `banyandb_queue_sub_*` to get the 
subscriber view (e.g. 
`rate(banyandb_queue_sub_total_finished{...}[$__rate_interval])`), adding 
`remote_role!="lifecycle"` to the selector so inbound migration traffic stays 
out of the request layer. Remember the subscriber edge is inverted — 
`remote_node` is the **source** and `pod_name` is the **target** — and that 
`banyandb_queue_sub_*` records `received_bytes`, not `sent_bytes`. Merge the 
two sets per edge, prefe [...]
+
+And the **migration layer** runs the same shapes against 
`banyandb_lifecycle_migration_*` — the tier-migration mirror of `queue_pub`, 
emitted by the lifecycle sidecar (`container_name="lifecycle"`, `pod_name` 
shared with its co-located data node):
+
+```promql
+# Migration throughput (messages/s), per edge and operation
+sum by (pod_name, remote_node, remote_role, remote_tier, operation)
+  
(rate(banyandb_lifecycle_migration_total_finished{job=~"$job"}[$__rate_interval]))
+
+# Migration p99 / errors / bytes — same shapes as above, on
+# banyandb_lifecycle_migration_total_latency_bucket / _total_err / _sent_bytes
+```
+
+These are publisher-side series, so the edge is `pod_name → remote_node` — but 
resolve the local `pod_name` to the pod's **`:17914` lifecycle node** (the 
series carry `container_name="lifecycle"`), not its data node. `operation` is 
`file-sync` for part shipping and `batch-write` for row replay.
 
 ## Lifecycle migration layer
 
@@ -106,7 +121,7 @@ The tiered-storage **lifecycle** service migrates data 
hot→warm→cold. It is
 
 - **Source = the lifecycle service's route table**, recomputed continuously 
from each group's lifecycle **stage node-selectors** — not only while the 
scheduled (e.g. `@daily`) migration runs. Each lifecycle instance runs on a 
data pod and publishes the **next-tier data nodes it migrates to**; the proxy 
exposes these as `calls` whose `source` is the lifecycle node — a hot-tier 
instance to warm nodes, a warm-tier instance to cold nodes, i.e. the 
hot→warm→cold path.
 - **Identify lifecycle nodes by their port.** Lifecycle nodes advertise the 
lifecycle gRPC port (default **`17914`**) and carry no role. The script treats 
any `calls` edge whose `source` ends in `:17914` as a migration edge and drops 
the rest of the `calls` mesh (property-repair gossip).
-- **Structural only — no metrics.** The migration publisher is built without a 
metadata service (`pub.NewWithoutMetadata()`), so it emits **no `queue_pub` 
metrics**. Migration edges have no throughput/latency/bytes — they show *that* 
a tier path exists, not how busy it is. (The request pipeline is the weighted 
layer.)
+- **Structural path + metric weight.** The migration publisher is built 
without a metadata service (`pub.NewWithoutMetadata()`), so it emits no 
`queue_pub` metrics — instead the same five instruments are registered under 
**`banyandb_lifecycle_migration_*`** (same labels; `operation` is `file-sync` 
for part shipping, `batch-write` for row replay; see 
[Metrics](../observability/metrics.md#lifecycle_migration--the-tier-migration-mirror-of-queue_pub)).
 The `calls` route table remains the **s [...]
 - **Distinct identity, all-interfaces bind.** Each lifecycle instance 
advertises `<pod-host>:17914` as its node name (resolved from the node host 
when `--lifecycle-grpc-host` is empty), so the per-tier instances stay distinct 
instead of collapsing into a single `:17914` node under the proxy's 
dedup-by-name. The gRPC server still **binds** to `:17914` (all interfaces) so 
the co-located FODC agent reaches it on `127.0.0.1`; only the advertised 
identity carries the host.
 
 ## Rendering recipe (offline join script)
@@ -114,9 +129,9 @@ The tiered-storage **lifecycle** service migrates data 
hot→warm→cold. It is
 The script [`render_topology.py`](./render_topology.py) does the whole join 
with only the Python standard library:
 
 - GETs `{proxy}/cluster/topology` and builds the node inventory 
(`metadata.name` → role / tier / pod / status, plus a `pod_name` → name map 
that excludes lifecycle sidecars so they don't shadow their co-located data 
node).
-- Runs the four per-edge queries against Prometheus for **both** 
`banyandb_queue_pub_*` and `banyandb_queue_sub_*`.
-- Joins them — `remote_node` maps to a topology node directly, the local 
`pod_name` maps to a node via `labels.pod_name` (falling back to the metric's 
own `node_*` labels). Publisher edges are `pod_name → remote_node`; subscriber 
edges are flipped to `remote_node → pod_name`. It accumulates per-edge 
throughput, p99, errors, and bytes from the publisher, then adds any edge the 
publisher lacks from the subscriber (so liaison→warm/cold `query` edges appear 
without double-counting).
-- Adds the **lifecycle migration layer**: `calls` edges whose `source` is a 
lifecycle node (`--lifecycle-port`, default `17914`) become dashed `migrate` 
edges (hot→warm→cold); the rest of the `calls` gossip mesh is dropped.
+- Runs the four per-edge queries against Prometheus for **both** 
`banyandb_queue_pub_*` and `banyandb_queue_sub_*`, plus the migration family 
`banyandb_lifecycle_migration_*`.
+- Joins them — `remote_node` maps to a topology node directly, the local 
`pod_name` maps to a node via `labels.pod_name` (falling back to the metric's 
own `node_*` labels). Publisher edges are `pod_name → remote_node`; subscriber 
edges are flipped to `remote_node → pod_name`. It accumulates per-edge 
throughput, p99, errors, and bytes from the publisher, then adds any edge the 
publisher lacks from the subscriber (so on older servers the liaison→warm/cold 
`query` edges appear without doubl [...]
+- Adds the **lifecycle migration layer**: `calls` edges whose `source` is a 
lifecycle node (`--lifecycle-port`, default `17914`) become dashed `migrate` 
edges (hot→warm→cold); the rest of the `calls` mesh is dropped. Each migration 
edge is **weighted from `banyandb_lifecycle_migration_*`** when those series 
exist (the local `pod_name` resolves to the pod's `:17914` lifecycle node), and 
stays a bare structural `migrate` edge otherwise — e.g. between scheduled runs 
on a fresh cluster, or o [...]
 - Prints **Graphviz DOT** and/or **Mermaid**: nodes shaped by role (liaison = 
box, data = cylinder, lifecycle = ellipse), colored by tier, dashed-red when 
unhealthy. Write edges are labeled with operation / throughput / p99 / bytes 
and turn red on errors or when p99 exceeds `--p99-warn`; migration edges are 
dashed.
 
 If Prometheus sits behind Grafana's datasource proxy with basic auth, set 
`PROM_USER` / `PROM_PASS` in the environment.
@@ -168,7 +183,7 @@ Two things to notice, both of which drive the design:
 
 The liaison→warm/cold `query` edges are now plain publisher series. The 
**subscriber** side (`queue_sub`) mirrors them but contributes nothing here — 
per-edge priority keeps every edge publisher-sourced, so there is no 
double-counting. On a cluster whose publisher predates the `query`/`control` 
instrumentation, the script would instead fill these edges from `queue_sub`, 
**inverted** (`remote_node`→`pod_name`).
 
-Lifecycle migration (from `calls`, structural — no metrics):
+Lifecycle migration (path from `calls`; this capture predates the 
`banyandb_lifecycle_migration_*` family, so the edges render structural — 
unweighted):
 
 | lifecycle source (`…:17914`) | → targets (next tier) |
 | --- | --- |
@@ -249,11 +264,12 @@ graph LR
   style lc_warm_1 fill:#ffb74d
 ```
 
-**Two layers.** Solid edges are the **request pipeline** — `batch-write`, 
`file-sync`, and `query`/`control` (including the liaison→warm/cold query 
fan-out), all from the publisher metrics, weighted by operation, throughput, 
p99, and file-sync bytes; red on errors or when p99 exceeds `--p99-warn`. 
Dashed `migrate` edges are the **lifecycle tier migration** (hot→warm→cold, 
from `calls`; structural, no metrics). Liaisons are boxes, data nodes cylinders 
colored by tier, lifecycle sidecars e [...]
+**Two layers.** Solid edges are the **request pipeline** — `batch-write`, 
`file-sync`, and `query`/`control` (including the liaison→warm/cold query 
fan-out), all from the publisher metrics, weighted by operation, throughput, 
p99, and file-sync bytes; red on errors or when p99 exceeds `--p99-warn`. 
Dashed `migrate` edges are the **lifecycle tier migration** (hot→warm→cold, 
path from `calls`; on a current build they additionally carry the 
`banyandb_lifecycle_migration_*` rates after a migr [...]
 
 ## Caveats
 
-- **Publisher first, subscriber fallback — per edge.** `queue_pub` is the 
primary source. `queue_sub` mirrors each edge from the receiver (inverted: 
`remote_node` is the sender); add it **only** for edges the publisher does not 
record, and never sum both sides for one edge, or you double-count throughput. 
The fallback exists because `query`/`control` are recorded on the receiver 
until the publish path is instrumented for them.
+- **Publisher first, subscriber fallback — per edge.** `queue_pub` is the 
primary source. `queue_sub` mirrors each edge from the receiver (inverted: 
`remote_node` is the sender); add it **only** for edges the publisher does not 
record, and never sum both sides for one edge, or you double-count throughput. 
The fallback exists for older servers that record `query`/`control` only on the 
receiver; current publishers record all four operations.
+- **Migration traffic appears in `queue_sub` with `remote_role="lifecycle"`.** 
The lifecycle publisher stamps its co-located data node's name and tier as its 
identity, so on the receiving node those series name a *data* node as the 
sender. Exclude them from the request-pipeline fallback 
(`remote_role!="lifecycle"`), or migration file-sync renders as a bogus 
data→data request edge; the migration layer itself is weighted from the 
publisher-side `banyandb_lifecycle_migration_*` family.
 - **Subscriber edges are inverted.** On a `queue_sub` series the scrape target 
(`pod_name`) is the **receiver** and `remote_node` is the **sender**, so the 
edge is `remote_node → pod_name` — the opposite of the publisher mapping. Flip 
it before joining.
 - **Lifecycle sidecars share a `pod_name`** with their co-located data node; 
exclude them from the `pod_name` → node map, or subscriber edges to the data 
node get misattributed to the `:17914` sidecar.
 - **Node name is a full DNS address** (`pod.headless.namespace:port`), and 
liaison/data use different ports — match on the whole string, not a prefix.
@@ -262,7 +278,7 @@ graph LR
 - **`*_total_err` is lazily registered** — absent means "no errors yet", not 
zero; treat a missing series as healthy.
 - **Use `rate(...[window])`, never raw counters.** After a restart, old 
counter series linger in Prometheus until they age out; a rate window ignores 
them.
 - **`control` / `query` edges carry little or no bytes** (`sent_bytes` is 
file-sync only; `queue_sub` records `received_bytes`), so those edges show 
throughput/latency without a byte figure.
-- **`calls` carries two edge kinds.** Lifecycle-source edges (`…:17914`) are 
the tier migration we render (dashed); data-source edges (`…:17912`) are the 
property-repair gossip mesh we drop. The liaison→data request pipeline is 
**not** in `calls` — that comes from the queue metrics. Each lifecycle instance 
advertises a distinct `<pod-host>:17914`, so the per-tier migration path 
renders without collapsing; see [Lifecycle migration 
layer](#lifecycle-migration-layer).
+- **`calls` carries three edge kinds.** Lifecycle-source edges (`…:17914`) are 
the tier migration we render (dashed); liaison-source edges (`…:18912`) are the 
liaison's route tables, dropped in favor of the weighted queue metrics; 
data-source edges (`…:17912`) are the property-repair gossip mesh, dropped as 
noise. Each lifecycle instance advertises a distinct `<pod-host>:17914`, so the 
per-tier migration path renders without collapsing; see [Lifecycle migration 
layer](#lifecycle-migratio [...]
 
 ## Related
 
diff --git a/docs/operation/observability/metrics.md 
b/docs/operation/observability/metrics.md
index 26f07a3d6..e298d7b7c 100644
--- a/docs/operation/observability/metrics.md
+++ b/docs/operation/observability/metrics.md
@@ -340,7 +340,7 @@ Each panel uses the same three expressions, with 
`operation` pinned to the panel
 
 Liaison nodes run an internal gRPC **queue server** (`server-queue-sub`, wired 
via `sub.NewServerWithPorts` in `pkg/cmdsetup/liaison.go`) and **queue 
clients** (`server-queue-pub`) for the tier-1/tier-2 pipelines. Prometheus 
metrics use the namespaces `banyandb_queue_sub_*` and `banyandb_queue_pub_*` 
(built from `observability.RootScope` + `queue_sub` / `queue_pub` sub-scopes). 
Data nodes expose the same families where the corresponding services run.
 
-Both namespaces share one model: the base metrics `total_started`, 
`total_finished`, `total_latency` (a histogram), and `total_err`, labeled by 
`operation` (`batch-write` / `file-sync` / `query` / `control`) and `group`, 
plus the **remote endpoint** of the flow — `remote_node` (the peer's BanyanDB 
node name, equal to its `/cluster/topology` `metadata.name`), `remote_role` 
(`liaison` / `data`), and `remote_tier` (`hot` / `warm` / `cold`, data only). 
`total_err` adds an `error_type` label. [...]
+Both namespaces share one model: the base metrics `total_started`, 
`total_finished`, `total_latency` (a histogram), and `total_err`, labeled by 
`operation` (`batch-write` / `file-sync` / `query` / `control`) and `group`, 
plus the **remote endpoint** of the flow — `remote_node` (the peer's BanyanDB 
node name, equal to its `/cluster/topology` `metadata.name`), `remote_role` 
(`liaison` / `data`, plus `lifecycle` on `queue_sub` for inbound tier-migration 
traffic — see the [lifecycle migratio [...]
 
 ### `queue_sub` — inbound server
 
@@ -351,7 +351,7 @@ Both namespaces share one model: the base metrics 
`total_started`, `total_finish
 | `total_err` | Counter | …, `error_type` | Errors by type. Lazily registered, 
so absent (not zero) on a healthy cluster. |
 | `received_bytes` | Counter | `operation`, `group`, `remote_node`, 
`remote_role`, `remote_tier` | Bytes received, **file-sync only** 
(`operation="file-sync"`). |
 
-**Troubleshooting:** a growing `total_started − total_finished` gap (or rising 
`total_latency` p99) for a `group`/`operation` points at slow or stuck 
consumers. `total_err` broken down by `error_type` distinguishes transport 
issues (`stream_error`, `recv_error`, `checksum_mismatch`, `out_of_order`) from 
completion issues (`finish_sync_err`, `part_failed`). For file-sync, 
`received_bytes` together with `total_latency` separates partial completion 
from healthy throughput.
+**Troubleshooting:** a growing `total_started − total_finished` gap (or rising 
`total_latency` p99) for a `group`/`operation` points at slow or stuck 
consumers. `total_err` broken down by `error_type` distinguishes transport 
issues (`stream_error`, `recv_error`, `checksum_mismatch`, `out_of_order`) from 
completion issues (`finish_sync_err`, `part_failed`). For file-sync, 
`received_bytes` together with `total_latency` separates partial completion 
from healthy throughput. On data nodes, se [...]
 
 ### `queue_pub` — outbound batch client
 
@@ -359,12 +359,34 @@ Both namespaces share one model: the base metrics 
`total_started`, `total_finish
 | --- | --- | --- | --- |
 | `total_started`, `total_finished` | Counter | `operation`, `group`, 
`remote_node`, `remote_role`, `remote_tier` | Sends started / finished per 
operation and target node; `total_finished` is the success rate. |
 | `total_latency` | Histogram | `operation`, `group`, `remote_node`, 
`remote_role`, `remote_tier` | Send latency (`_bucket` / `_sum` / `_count`); 
use `histogram_quantile` for p99. |
-| `total_err` | Counter | …, `error_type` | Send errors by type. `error_type` 
is one of `non_transient`, `canceled`, `stream_canceled`, `retry_exhausted`, 
`recv_error`, `server_rejected` (Send path) or `stream_error`, `recv_error`, 
`checksum_mismatch`, `out_of_order`, `session_not_found`, `completion_error` 
(file-sync). Lazily registered. |
+| `total_err` | Counter | …, `error_type` | Send errors by type. `error_type` 
is one of `non_transient`, `canceled`, `stream_canceled`, `retry_exhausted`, 
`recv_error`, `server_rejected`, `send_error`, `decode_error`, `invalid_topic` 
(Send/publish path) or `stream_error`, `recv_error`, `checksum_mismatch`, 
`out_of_order`, `session_not_found`, `completion_error` (file-sync). Lazily 
registered. |
 | `sent_bytes` | Counter | `operation`, `group`, `remote_node`, `remote_role`, 
`remote_tier` | Bytes sent, **file-sync only**. |
 
 **Troubleshooting:** `total_err` by `error_type` separates transport failures 
(`recv_error`) from application-level `SendResponse` errors (`server_rejected`) 
and exhausted retries (`retry_exhausted`). A persistent `total_started − 
total_finished` gap, or rising `total_latency` p99 for a given `remote_node` / 
`remote_tier`, indicates slow or unavailable data nodes.
 
-Metrics are only registered when `metadata` implements `metadata.Service` and 
`MetricsRegistry()` is non-nil (e.g. after `SetMetricsRegistry` in bootstrap). 
`NewWithoutMetadata()` leaves `queue_pub` metrics disabled and logs a warning 
(`queue_pub metrics disabled: ...`). The `total_err` counters above are 
registered lazily on first occurrence, so they are simply absent (not zero) on 
a healthy cluster.
+Metrics are only registered when `metadata` implements `metadata.Service` and 
`MetricsRegistry()` is non-nil (e.g. after `SetMetricsRegistry` in bootstrap). 
`NewWithoutMetadata(omr)` — used by the lifecycle service's tier-migration 
publisher — leaves the regular `queue_pub` family disabled and registers the 
same instrument set under `banyandb_lifecycle_migration_*` instead (see 
[below](#lifecycle_migration--the-tier-migration-mirror-of-queue_pub)). The 
`total_err` counters above are regi [...]
+
+### `lifecycle_migration` — the tier-migration mirror of `queue_pub`
+
+The **lifecycle** service migrates data hot→warm→cold through a queue client 
built without a metadata service (`pub.NewWithoutMetadata`), so its traffic 
does **not** appear under `banyandb_queue_pub_*`. Instead the same five 
instruments — `total_started`, `total_finished`, `total_latency`, `total_err`, 
`sent_bytes` — are registered under **`banyandb_lifecycle_migration_*`**, with 
the same labels (`operation`, `group`, `remote_node`, `remote_role`, 
`remote_tier`, plus `error_type` on `tot [...]
+
+The lifecycle sidecar serves these series (plus the run-health gauges below) 
at `/metrics` on its own HTTP port (`--lifecycle-http-port`, default `17915`) 
instead of the `2121` observability listener; the co-located FODC agent polls 
that port, so the series arrive through the proxy scrape with 
`container_name="lifecycle"` and the `pod_name` shared with the co-located data 
node. On the **receiving** data node the same flows are mirrored as 
`banyandb_queue_sub_*` series with `remote_role=" [...]
+
+**Example:** migration throughput per target node — 
`sum(rate(banyandb_lifecycle_migration_total_finished{job=~"$job"}[$__rate_interval]))
 by (operation, remote_node)`; errors — 
`sum(rate(banyandb_lifecycle_migration_total_err{job=~"$job"}[$__rate_interval]))
 by (error_type)`.
+
+For the worked recipe that turns this family into weighted hot→warm→cold edges 
in a topology graph, see [Cluster Topology 
Rendering](../fodc/topology.md#lifecycle-migration-layer).
+
+### Lifecycle run health (`banyandb_lifecycle_*`)
+
+The lifecycle scheduler also exposes three service-level series:
+
+| Metric | Type | Meaning |
+| --- | --- | --- |
+| `banyandb_lifecycle_cycles_total` | Counter | Migration cycles executed by 
the scheduler. |
+| `banyandb_lifecycle_last_run_timestamp_seconds` | Gauge | Unix time at which 
the most recent migration run **started**; stamped on every return path 
(success, error, recovered panic). |
+| `banyandb_lifecycle_last_run_success` | Gauge | Outcome of the most recent 
run: `1` success, `0` failure. |
+
+**Suggested alerts:** `banyandb_lifecycle_last_run_success == 0`, and `time() 
- banyandb_lifecycle_last_run_timestamp_seconds` exceeding the migration 
schedule interval (a missed run).
 
 ### Example PromQL snippets
 
diff --git a/docs/operation/observability/overview.md 
b/docs/operation/observability/overview.md
index a0635f83d..aafc4af0d 100644
--- a/docs/operation/observability/overview.md
+++ b/docs/operation/observability/overview.md
@@ -26,7 +26,8 @@ If you watch nothing else, watch these. They are organized 
with the two canonica
 | 8 | **Merge / compaction health** (LSM) | Merge File Rate/Latency/Partitions 
(`banyandb_*_total_merge_loop_started`, `_merge_latency`, `_merged_parts`), 
part counts (`*_total_file_parts`) | merge-latency spikes correlated with 
write-latency spikes; steadily growing part/partition counts | a compaction 
storm or backlog → write-latency spikes, query slowdown, oversized/broken parts 
[6] |
 | 9 | **Cardinality / series growth** | Total Series 
(`banyandb_*_inverted_index_total_doc_count`), `_total_updates` (churn), 
`_total_term_searchers_started` | rapid `doc_count` growth, high churn, or high 
term-search rate | a cardinality explosion → memory pressure, inverted-index 
bloat, and slow queries |
 | 10 | **Node liveness / membership** | Active Instances 
(`count(banyandb_system_up_time)` by `container_name`), per-node 
`banyandb_system_up_time` | reporting-node count below expected; a node's 
uptime dropping to ~0 (restart) or its series disappearing (gone) | lost 
capacity, under-replication, and query gaps |
+| 11 | **Lifecycle migration health** (tiered storage) | 
`banyandb_lifecycle_last_run_success` (1/0), 
`banyandb_lifecycle_last_run_timestamp_seconds`, migration errors 
`banyandb_lifecycle_migration_total_err` | `last_run_success == 0`, or `time() 
- last_run_timestamp_seconds` exceeding the migration schedule interval | 
failed or missed hot→warm→cold migration — data piling up on the hot tier until 
its disk fills (signal #4) |
 
-**Priority order:** 1–3 (RED) tell you whether users are affected *right now*; 
4–7 (USE/backpressure) are the **leading indicators** that catch trouble before 
it becomes user-visible; 8–10 catch the classic slow-burn database failures 
(compaction backlog, cardinality blow-up, node loss). A practical first alert 
set: query p99 latency, error rate, disk > 85%, memory near 
`--allowed-percent`, and sustained wqueue/`queue_pub` backlog.
+**Priority order:** 1–3 (RED) tell you whether users are affected *right now*; 
4–7 (USE/backpressure) are the **leading indicators** that catch trouble before 
it becomes user-visible; 8–11 catch the classic slow-burn database failures 
(compaction backlog, cardinality blow-up, node loss, stalled tier migration). A 
practical first alert set: query p99 latency, error rate, disk > 85%, memory 
near `--allowed-percent`, and sustained wqueue/`queue_pub` backlog.
 
 > Sources: [1] Google SRE — *Monitoring Distributed Systems* (Four Golden 
 > Signals) <https://sre.google/sre-book/monitoring-distributed-systems/> · [2] 
 > B. Gregg — *The USE Method* <https://www.brendangregg.com/usemethod.html> · 
 > [3] *The RED Method* (Grafana / T. Wilkie) 
 > <https://grafana.com/blog/the-red-method-how-to-instrument-your-services/> · 
 > [4] Elasticsearch disk watermarks 85/90/95% 
 > <https://www.elastic.co/docs/troubleshoot/elasticsearch/fix-watermark-errors>
 >  · [5] Prometheus remote [...]
diff --git a/docs/operation/observability/providers.md 
b/docs/operation/observability/providers.md
index 523056c11..df1d71031 100644
--- a/docs/operation/observability/providers.md
+++ b/docs/operation/observability/providers.md
@@ -6,7 +6,7 @@ BanyanDB has built-in support for metrics collection. 
Currently, there are two s
 
 Prometheus is auto enabled at run time, if no flag is passed or if 
`prometheus` is set in `observability-modes` flag.
 
-When the Prometheus metrics provider is enabled, each BanyanDB process exposes 
its own metrics on port `2121`.
+When the Prometheus metrics provider is enabled, each BanyanDB process exposes 
its own metrics on port `2121`. (The lifecycle sidecar is the exception: it 
reuses its HTTP API port — `--lifecycle-http-port`, default `17915` — and 
serves the same Prometheus payload at `/metrics` there instead of opening 
`2121`.)
 
 In a cluster, the recommended setup is to let the **FODC proxy** aggregate 
every node's metrics and scrape the proxy as the single target. Configure the 
following Prometheus job (replace `BANYANDB_NAMESPACE` with the namespace where 
BanyanDB is deployed, and adjust the keep-regex to match your FODC proxy pod 
label):
 

Reply via email to