This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch feat/fodc-grafana-dashboard in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit bb3c6a8a9421b2ca28cf9702218401d85acac8bf Author: Hongtao Gao <[email protected]> AuthorDate: Wed Jun 10 00:45:38 2026 +0000 docs(fodc): align observability docs with lifecycle migration metrics Reconcile the FODC observability docs against the metric changes merged to main since this branch was created (#1164, #1166, #1167): - metrics.md: document the banyandb_lifecycle_migration_* family (the tier-migration mirror of queue_pub, served by the lifecycle sidecar at --lifecycle-http-port/:17915) and the lifecycle run-health series (cycles_total, last_run_timestamp_seconds, last_run_success); add remote_role="lifecycle" to queue_sub, drop the stale "no queue metrics" claim, and complete the publish-path error_type list (send_error, decode_error, invalid_topic). - topology.md + render_topology.py: weight the lifecycle migration layer from banyandb_lifecycle_migration_* (path still from /cluster/topology calls), exclude remote_role="lifecycle" sub series from the request pipeline fallback (else migration file-sync renders as a bogus data->data edge), reframe query/control as publisher-recorded by default, and fix the calls edge-kind count (three: migration, liaison route-table, gossip). - overview.md: add Key Signal #11 (lifecycle migration health) without renumbering the existing signals. - providers.md: note the lifecycle sidecar serves /metrics on :17915. Also restore banyand/queue/test/chunked_sync_common.go, dropped by accident in the previous commit, so banyand/queue/test compiles again. --- banyand/queue/test/chunked_sync_common.go | 539 ++++++++++++++++++++++++++++++ docs/operation/fodc/render_topology.py | 75 +++-- docs/operation/fodc/topology.md | 48 ++- docs/operation/observability/metrics.md | 30 +- docs/operation/observability/overview.md | 3 +- docs/operation/observability/providers.md | 2 +- 6 files changed, 654 insertions(+), 43 deletions(-) diff --git a/banyand/queue/test/chunked_sync_common.go b/banyand/queue/test/chunked_sync_common.go new file mode 100644 index 000000000..a1fecd94f --- /dev/null +++ b/banyand/queue/test/chunked_sync_common.go @@ -0,0 +1,539 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/spf13/cobra" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/apache/skywalking-banyandb/api/data" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/banyand/queue/pub" + "github.com/apache/skywalking-banyandb/banyand/queue/sub" + "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" +) + +// Setup holds the test environment configuration and components. +type Setup struct { + Server queue.Server + Client queue.Client + Closer run.Unit + ChunkedClient queue.ChunkedSyncClient + MockHandler *MockChunkedSyncHandler + DeferFn func() + NodeName string + NodeAddr string + TestGroup run.Group + GRPCPort uint32 + HTTPPort uint32 +} + +// setupChunkedSyncTest creates a complete test environment for chunked sync testing. +func setupChunkedSyncTest(t *testing.T, testName string) *Setup { + return setupChunkedSyncTestWithChunkSize(t, testName, 1024) +} + +// setupChunkedSyncTestWithChunkSize creates a complete test environment for chunked sync testing with custom chunk size. +func setupChunkedSyncTestWithChunkSize(t *testing.T, testName string, chunkSize uint32) *Setup { + ports, err := test.AllocateFreePorts(2) + require.NoError(t, err, "Failed to allocate free ports") + + grpcPort := uint32(ports[0]) + httpPort := uint32(ports[1]) + + omr := observability.BypassRegistry + + testGroup := run.NewGroup(testName) + closer, deferFn := run.NewTester(testName + "-closer") + + server := sub.NewServerWithPorts(omr, testName, grpcPort, httpPort) + + mockHandler := NewMockChunkedSyncHandler() + + server.RegisterChunkedSyncHandler(data.TopicStreamPartSync, mockHandler) + + testGroup.Register(closer, server) + + cmd := &cobra.Command{ + Use: testName, + FParseErrWhitelist: cobra.FParseErrWhitelist{ + UnknownFlags: true, // Ignore unknown flags + }, + Run: func(_ *cobra.Command, _ []string) { + err = testGroup.Run(context.Background()) + if err != nil { + t.Logf("Server group failed: %v", err) + } + }, + } + cmd.Flags().AddFlagSet(testGroup.RegisterFlags().FlagSet) + go func() { + require.NoError(t, cmd.Execute()) + }() + + assert.Eventually(t, func() bool { + errInternal := helpers.HealthCheck(fmt.Sprintf("localhost:%d", grpcPort), 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials()))() + return errInternal == nil + }, flags.EventuallyTimeout, 100*time.Millisecond) + + client := pub.NewWithoutMetadata(nil) + + nodeAddr := fmt.Sprintf("localhost:%d", grpcPort) + nodeName := testName + "-node" + node := schema.Metadata{ + TypeMeta: schema.TypeMeta{ + Name: nodeName, + Kind: schema.KindNode, + }, + Spec: &databasev1.Node{ + Metadata: &commonv1.Metadata{ + Name: nodeName, + }, + Roles: []databasev1.Role{databasev1.Role_ROLE_DATA}, + GrpcAddress: nodeAddr, + }, + } + + client.OnAddOrUpdate(node) + + var chunkedClient queue.ChunkedSyncClient + assert.Eventually(t, func() bool { + var errInternal error + chunkedClient, errInternal = client.NewChunkedSyncClient(nodeName, chunkSize) + if err != nil { + t.Logf("NewChunkedSyncClient error: %v", err) + } + return errInternal == nil + }, flags.EventuallyTimeout, 100*time.Millisecond) + + return &Setup{ + Server: server, + Client: client, + MockHandler: mockHandler, + TestGroup: testGroup, + Closer: closer, + DeferFn: deferFn, + GRPCPort: grpcPort, + HTTPPort: httpPort, + NodeName: nodeName, + NodeAddr: nodeAddr, + ChunkedClient: chunkedClient, + } +} + +// cleanupTestSetup cleans up the test environment. +func cleanupTestSetup(setup *Setup) { + if setup.ChunkedClient != nil { + setup.ChunkedClient.Close() + } + if setup.DeferFn != nil { + setup.DeferFn() + } +} + +// createTestDataPart creates a StreamingPartData for testing. +func createTestDataPart(files map[string][]byte, group string, shardID uint32) queue.StreamingPartData { + var fileInfos []queue.FileInfo + var totalSize uint64 + + for fileName, content := range files { + var buf bytes.Buffer + if _, err := buf.Write(content); err != nil { + panic(fmt.Sprintf("failed to write content to buffer: %v", err)) + } + fileInfos = append(fileInfos, queue.FileInfo{ + Name: fileName, + Reader: buf.SequentialRead(), + }) + totalSize += uint64(len(content)) + } + + return queue.StreamingPartData{ + ID: 1, + Files: fileInfos, + Group: group, + ShardID: shardID, + Topic: data.TopicStreamPartSync.String(), + CompressedSizeBytes: totalSize, + UncompressedSizeBytes: totalSize, + TotalCount: uint64(len(files)), + BlocksCount: 1, + MinTimestamp: time.Now().UnixMilli(), + MaxTimestamp: time.Now().UnixMilli(), + } +} + +// createTestDataPartWithMetadata creates a StreamingPartData for testing with specific metadata values. +func createTestDataPartWithMetadata(files map[string][]byte, group string, shardID uint32, + compressedSize, uncompressedSize, totalCount, blocksCount uint64, + minTimestamp, maxTimestamp int64, partID uint64, +) queue.StreamingPartData { + var fileInfos []queue.FileInfo + + for fileName, content := range files { + var buf bytes.Buffer + if _, err := buf.Write(content); err != nil { + panic(fmt.Sprintf("failed to write content to buffer: %v", err)) + } + fileInfos = append(fileInfos, queue.FileInfo{ + Name: fileName, + Reader: buf.SequentialRead(), + }) + } + + return queue.StreamingPartData{ + ID: partID, + Files: fileInfos, + Group: group, + ShardID: shardID, + Topic: data.TopicStreamPartSync.String(), + CompressedSizeBytes: compressedSize, + UncompressedSizeBytes: uncompressedSize, + TotalCount: totalCount, + BlocksCount: blocksCount, + MinTimestamp: minTimestamp, + MaxTimestamp: maxTimestamp, + } +} + +// MockPartHandler implements queue.PartHandler for testing. +type MockPartHandler struct { + receivedFiles map[string][]byte + syncHandler *MockChunkedSyncHandler + trackingHandler *MockChunkedSyncHandlerWithTracking + receivedChunks [][]byte + finishSyncCalled bool + closeCalled bool +} + +// NewPartType marks the part as new and tracks the call. +func (m *MockPartHandler) NewPartType(ctx *queue.ChunkedSyncPartContext) error { + if m.trackingHandler != nil { + m.trackingHandler.TrackNewPartType(ctx) + } + return nil +} + +// FinishSync marks the sync as finished and closes the handler. +func (m *MockPartHandler) FinishSync() error { + m.finishSyncCalled = true + + if m.syncHandler != nil { + m.syncHandler.mu.Lock() + m.syncHandler.completedParts = append(m.syncHandler.completedParts, m) + m.syncHandler.mu.Unlock() + } + return m.Close() +} + +// Close marks the handler as closed. +func (m *MockPartHandler) Close() error { + m.closeCalled = true + return nil +} + +// GetReceivedFiles returns a copy of the received files. +func (m *MockPartHandler) GetReceivedFiles() map[string][]byte { + result := make(map[string][]byte) + for k, v := range m.receivedFiles { + result[k] = make([]byte, len(v)) + copy(result[k], v) + } + return result +} + +// GetReceivedChunksCount returns the number of received chunks. +func (m *MockPartHandler) GetReceivedChunksCount() int { + return len(m.receivedChunks) +} + +// MockChunkedSyncHandler implements queue.ChunkedSyncHandler for testing. +type MockChunkedSyncHandler struct { + completedParts []*MockPartHandler + receivedContexts []*queue.ChunkedSyncPartContext + mu sync.Mutex +} + +// NewMockChunkedSyncHandler creates a new mock chunked sync handler. +func NewMockChunkedSyncHandler() *MockChunkedSyncHandler { + return &MockChunkedSyncHandler{ + completedParts: make([]*MockPartHandler, 0), + receivedContexts: make([]*queue.ChunkedSyncPartContext, 0), + } +} + +// HandleFileChunk processes incoming file chunks for testing. +func (m *MockChunkedSyncHandler) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, chunk []byte) error { + currentPart := ctx.Handler.(*MockPartHandler) + + if currentPart != nil { + chunkCopy := make([]byte, len(chunk)) + copy(chunkCopy, chunk) + currentPart.receivedChunks = append(currentPart.receivedChunks, chunkCopy) + + if currentPart.receivedFiles[ctx.FileName] == nil { + currentPart.receivedFiles[ctx.FileName] = make([]byte, 0) + } + currentPart.receivedFiles[ctx.FileName] = append(currentPart.receivedFiles[ctx.FileName], chunkCopy...) + } + + return nil +} + +// CreatePartHandler creates a new part handler for testing. +func (m *MockChunkedSyncHandler) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (queue.PartHandler, error) { + m.mu.Lock() + contextCopy := &queue.ChunkedSyncPartContext{ + ID: ctx.ID, + Group: ctx.Group, + ShardID: ctx.ShardID, + PartType: ctx.PartType, + CompressedSizeBytes: ctx.CompressedSizeBytes, + UncompressedSizeBytes: ctx.UncompressedSizeBytes, + TotalCount: ctx.TotalCount, + BlocksCount: ctx.BlocksCount, + MinTimestamp: ctx.MinTimestamp, + MaxTimestamp: ctx.MaxTimestamp, + MinKey: ctx.MinKey, + MaxKey: ctx.MaxKey, + } + m.receivedContexts = append(m.receivedContexts, contextCopy) + m.mu.Unlock() + + partHandler := &MockPartHandler{ + receivedFiles: make(map[string][]byte), + syncHandler: m, + receivedChunks: make([][]byte, 0), + } + contextCopy.Handler = partHandler + return partHandler, nil +} + +// GetCompletedParts returns all completed parts. +func (m *MockChunkedSyncHandler) GetCompletedParts() []*MockPartHandler { + m.mu.Lock() + defer m.mu.Unlock() + result := make([]*MockPartHandler, len(m.completedParts)) + copy(result, m.completedParts) + return result +} + +// GetReceivedFiles returns all received files across all parts. +func (m *MockChunkedSyncHandler) GetReceivedFiles() map[string][]byte { + m.mu.Lock() + defer m.mu.Unlock() + result := make(map[string][]byte) + for _, part := range m.completedParts { + for fileName, content := range part.GetReceivedFiles() { + result[fileName] = content + } + } + return result +} + +// GetReceivedChunksCount returns the total number of received chunks. +func (m *MockChunkedSyncHandler) GetReceivedChunksCount() int { + m.mu.Lock() + defer m.mu.Unlock() + total := 0 + for _, part := range m.completedParts { + total += part.GetReceivedChunksCount() + } + return total +} + +// GetReceivedContexts returns all received contexts. +func (m *MockChunkedSyncHandler) GetReceivedContexts() []*queue.ChunkedSyncPartContext { + m.mu.Lock() + defer m.mu.Unlock() + + result := make([]*queue.ChunkedSyncPartContext, len(m.receivedContexts)) + for i, ctx := range m.receivedContexts { + contextCopy := &queue.ChunkedSyncPartContext{ + ID: ctx.ID, + Group: ctx.Group, + ShardID: ctx.ShardID, + FileName: ctx.FileName, + CompressedSizeBytes: ctx.CompressedSizeBytes, + UncompressedSizeBytes: ctx.UncompressedSizeBytes, + TotalCount: ctx.TotalCount, + BlocksCount: ctx.BlocksCount, + MinTimestamp: ctx.MinTimestamp, + MaxTimestamp: ctx.MaxTimestamp, + MinKey: ctx.MinKey, + MaxKey: ctx.MaxKey, + } + result[i] = contextCopy + } + return result +} + +// MockChunkedSyncHandlerWithTracking extends MockChunkedSyncHandler with NewPartType call tracking. +type MockChunkedSyncHandlerWithTracking struct { + *MockChunkedSyncHandler + newPartTypeContexts []*queue.ChunkedSyncPartContext + trackingMu sync.Mutex + newPartTypeCalled bool +} + +// NewMockChunkedSyncHandlerWithTracking creates a new mock handler with tracking capabilities. +func NewMockChunkedSyncHandlerWithTracking() *MockChunkedSyncHandlerWithTracking { + return &MockChunkedSyncHandlerWithTracking{ + MockChunkedSyncHandler: NewMockChunkedSyncHandler(), + newPartTypeContexts: make([]*queue.ChunkedSyncPartContext, 0), + } +} + +// TrackNewPartType tracks a NewPartType call. +func (m *MockChunkedSyncHandlerWithTracking) TrackNewPartType(ctx *queue.ChunkedSyncPartContext) { + m.trackingMu.Lock() + defer m.trackingMu.Unlock() + m.newPartTypeCalled = true + contextCopy := &queue.ChunkedSyncPartContext{ + ID: ctx.ID, + Group: ctx.Group, + ShardID: ctx.ShardID, + PartType: ctx.PartType, + CompressedSizeBytes: ctx.CompressedSizeBytes, + UncompressedSizeBytes: ctx.UncompressedSizeBytes, + TotalCount: ctx.TotalCount, + BlocksCount: ctx.BlocksCount, + MinTimestamp: ctx.MinTimestamp, + MaxTimestamp: ctx.MaxTimestamp, + MinKey: ctx.MinKey, + MaxKey: ctx.MaxKey, + } + m.newPartTypeContexts = append(m.newPartTypeContexts, contextCopy) +} + +// NewPartTypeCalled returns whether NewPartType has been called. +func (m *MockChunkedSyncHandlerWithTracking) NewPartTypeCalled() bool { + m.trackingMu.Lock() + defer m.trackingMu.Unlock() + return m.newPartTypeCalled +} + +// GetNewPartTypeContexts returns all contexts passed to NewPartType. +func (m *MockChunkedSyncHandlerWithTracking) GetNewPartTypeContexts() []*queue.ChunkedSyncPartContext { + m.trackingMu.Lock() + defer m.trackingMu.Unlock() + result := make([]*queue.ChunkedSyncPartContext, len(m.newPartTypeContexts)) + copy(result, m.newPartTypeContexts) + return result +} + +// ResetNewPartTypeCalls resets the tracking state. +func (m *MockChunkedSyncHandlerWithTracking) ResetNewPartTypeCalls() { + m.trackingMu.Lock() + defer m.trackingMu.Unlock() + m.newPartTypeCalled = false + m.newPartTypeContexts = nil +} + +// CreatePartHandler creates a new part handler with tracking capabilities. +func (m *MockChunkedSyncHandlerWithTracking) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (queue.PartHandler, error) { + partHandler, err := m.MockChunkedSyncHandler.CreatePartHandler(ctx) + if err != nil { + return nil, err + } + + // Cast to our enhanced handler and set tracking reference + if mockHandler, ok := partHandler.(*MockPartHandler); ok { + mockHandler.trackingHandler = m + } + + return partHandler, nil +} + +// createTestDataPartWithPartType creates test data part with specified PartType. +func createTestDataPartWithPartType( + files map[string][]byte, + group string, + shardID uint32, + partID uint64, + partType string, +) queue.StreamingPartData { + var fileInfos []queue.FileInfo + + for fileName, content := range files { + var buf bytes.Buffer + if _, err := buf.Write(content); err != nil { + panic(fmt.Sprintf("failed to write content to buffer: %v", err)) + } + fileInfos = append(fileInfos, queue.FileInfo{ + Name: fileName, + Reader: buf.SequentialRead(), + }) + } + + return queue.StreamingPartData{ + ID: partID, + Files: fileInfos, + Group: group, + ShardID: shardID, + PartType: partType, + Topic: data.TopicStreamPartSync.String(), + CompressedSizeBytes: 1024, + UncompressedSizeBytes: 2048, + TotalCount: 100, + BlocksCount: 5, + MinTimestamp: time.Now().UnixMilli(), + MaxTimestamp: time.Now().UnixMilli(), + } +} + +// TrackingSetup extends Setup with tracking capabilities. +type TrackingSetup struct { + *Setup + TrackingHandler *MockChunkedSyncHandlerWithTracking +} + +// setupChunkedSyncTestWithTracking creates a test environment with tracking capabilities. +func setupChunkedSyncTestWithTracking(t *testing.T, testName string) *TrackingSetup { + setup := setupChunkedSyncTest(t, testName) + + trackingHandler := NewMockChunkedSyncHandlerWithTracking() + + // Replace the handler in the server + setup.Server.RegisterChunkedSyncHandler(data.TopicStreamPartSync, trackingHandler) + + // Update the MockHandler reference in the setup to point to our tracking handler + setup.MockHandler = trackingHandler.MockChunkedSyncHandler + + return &TrackingSetup{ + Setup: setup, + TrackingHandler: trackingHandler, + } +} diff --git a/docs/operation/fodc/render_topology.py b/docs/operation/fodc/render_topology.py index cc31f5ab3..1f453e4b2 100755 --- a/docs/operation/fodc/render_topology.py +++ b/docs/operation/fodc/render_topology.py @@ -18,10 +18,13 @@ """Render a BanyanDB cluster topology by joining the FODC proxy's /cluster/topology (node inventory) with queue Prometheus metrics (live edges). -Edges come from the publisher metrics (queue_pub) by default; for edges the -publisher does not record -- chiefly liaison->warm/cold query fan-out -- the -subscriber metrics (queue_sub) fill in, read from the receiver's side and -flipped back to the true sender->receiver direction. +Request edges come from the publisher metrics (queue_pub) by default; for +edges the publisher does not record -- chiefly liaison->warm/cold query +fan-out on older servers -- the subscriber metrics (queue_sub) fill in, read +from the receiver's side and flipped back to the true sender->receiver +direction. Lifecycle tier-migration edges take their path from the topology +`calls` and their weight from the banyandb_lifecycle_migration_* family (the +tier-migration mirror of queue_pub) whenever a migration has run. Output is Graphviz DOT and/or Mermaid. Stdlib only. If Prometheus is behind Grafana's datasource proxy with basic auth, set PROM_USER / PROM_PASS in the environment.""" @@ -72,7 +75,7 @@ def main(): # 1) Node inventory from /cluster/topology. topo = http_get_json(args.proxy.rstrip("/") + "/cluster/topology") - nodes, podname2name = {}, {} + nodes, podname2name, podname2lifecycle = {}, {}, {} for n in topo.get("nodes", []): name = (n.get("metadata") or {}).get("name", "") if not name: @@ -94,6 +97,14 @@ def main(): host = name.split(".")[0].split(":")[0] if host and not is_lifecycle and host not in podname2name: podname2name[host] = name + # The banyandb_lifecycle_migration_* series are scraped from the lifecycle + # sidecar (container_name="lifecycle") but share the data pod's pod_name, + # so they resolve through a separate map to the pod's :17914 identity. + if is_lifecycle: + if labels.get("pod_name"): + podname2lifecycle.setdefault(labels["pod_name"], name) + if host: + podname2lifecycle.setdefault(host, name) def local_name(pod): return podname2name.get(pod, pod) @@ -117,15 +128,21 @@ def main(): # flow is pod_name -> remote_node. The subscriber (queue_sub) scrape # target is the receiver and remote_node is the sender, so its flow is # remote_node -> pod_name -- the edge is flipped. The sub view is what - # surfaces liaison->warm/cold query fan-out, which the publisher side - # does not record on servers whose publish path is not yet instrumented. - def build_edges(thr, p99, err, byt, invert): + # surfaces liaison->warm/cold query fan-out on older servers whose + # publish path predates the query/control instrumentation. Sub series + # with remote_role="lifecycle" are inbound tier-migration traffic, not + # request pipeline, and are skipped (the migration layer is weighted + # from the publisher-side banyandb_lifecycle_migration_* family). + def build_edges(thr, p99, err, byt, invert, resolve_local=None, skip_remote_role=None): + resolve = resolve_local or local_name es = {} for s in thr: m = s["metric"] - pod_node, peer = local_name(m.get("pod_name", "")), m.get("remote_node", "") + pod_node, peer = resolve(m.get("pod_name", "")), m.get("remote_node", "") if not pod_node or not peer: continue + if skip_remote_role and m.get("remote_role", "") == skip_remote_role: + continue pod_role, pod_tier = primary_role([m.get("node_role", "")]), m.get("node_type", "") peer_role, peer_tier = m.get("remote_role", ""), m.get("remote_tier", "") if invert: @@ -142,7 +159,7 @@ def main(): def add_scalar(rows, field): for s in rows: - pod_node = local_name(s["metric"].get("pod_name", "")) + pod_node = resolve(s["metric"].get("pod_name", "")) peer = s["metric"].get("remote_node", "") key = (peer, pod_node) if invert else (pod_node, peer) if key in es: @@ -169,23 +186,37 @@ def main(): p99_by("banyandb_queue_sub_total_latency_bucket"), rate_by("banyandb_queue_sub_total_err", "pod_name, remote_node"), None, # sub records received_bytes, not sent_bytes; query/control carry no bytes - invert=True) + invert=True, skip_remote_role="lifecycle") edges = dict(pub_edges) for key, sub_edge in sub_edges.items(): edges.setdefault(key, sub_edge) # default pub; fall back to sub for edges pub lacks - # 3) Lifecycle migration edges from /cluster/topology `calls`: keep only edges - # whose source is a lifecycle node (name ends with the lifecycle port) and - # drop the data<->data property-gossip mesh. These are structural (the - # lifecycle publisher is metadata-less, so it emits no queue metrics). + # 3) Lifecycle migration edges. The PATH comes from /cluster/topology `calls` + # (structural; present even between scheduled runs): keep only edges whose + # source is a lifecycle node (name ends with the lifecycle port), drop the + # liaison route-table copies and the data<->data property-gossip mesh. The + # WEIGHT comes from the banyandb_lifecycle_migration_* family -- the + # tier-migration mirror of queue_pub, scraped from the lifecycle sidecar + # (container_name="lifecycle"), whose pod_name resolves to the pod's + # :17914 lifecycle identity rather than its data node. lc_suffix = ":" + str(args.lifecycle_port) - migrations = [] + migrations = {} for c in topo.get("calls", []): src, dst = c.get("source", ""), c.get("target", "") if src.endswith(lc_suffix) and dst: ensure_node(src, role="lifecycle") ensure_node(dst) - migrations.append((src, dst)) + migrations.setdefault((src, dst), None) + mig_weights = build_edges( + rate_by("banyandb_lifecycle_migration_total_finished", edge_by), + p99_by("banyandb_lifecycle_migration_total_latency_bucket"), + rate_by("banyandb_lifecycle_migration_total_err", "pod_name, remote_node"), + rate_by("banyandb_lifecycle_migration_sent_bytes", "pod_name, remote_node"), + invert=False, resolve_local=lambda pod: podname2lifecycle.get(pod, pod)) + for key, e in mig_weights.items(): + ensure_node(key[0], role="lifecycle") + ensure_node(key[1]) + migrations[key] = e # weighted when a migration has run; else structural if args.format in ("dot", "both"): print(render_dot(nodes, edges, migrations, args.p99_warn)) @@ -216,8 +247,9 @@ def render_dot(nodes, edges, migrations, p99_warn): pw = 1.0 + min(4.0, sum(e["ops"].values()) ** 0.25) out.append(' "%s" -> "%s" [label="%s", color="%s", penwidth=%.1f, fontsize=10];' % (local, remote, edge_label(e), "#c62828" if red else "#607d8b", pw)) - for src, dst in sorted(migrations): # lifecycle migration (structural) - out.append(' "%s" -> "%s" [label="migrate", style=dashed, color="#8e24aa", fontsize=9];' % (src, dst)) + for (src, dst), e in sorted(migrations.items()): # lifecycle migration (dashed) + label = "migrate" if e is None else "migrate\\n" + edge_label(e) + out.append(' "%s" -> "%s" [label="%s", style=dashed, color="#8e24aa", fontsize=9];' % (src, dst, label)) out.append("}") return "\n".join(out) @@ -235,8 +267,9 @@ def render_mermaid(nodes, edges, migrations, p99_warn): if e["err"] > 0 or e["p99"] > p99_warn: red_idx.append(i) i += 1 - for src, dst in sorted(migrations): # lifecycle migration (structural, dashed) - out.append(' %s -. "migrate" .-> %s' % (nid(src), nid(dst))) + for (src, dst), e in sorted(migrations.items()): # lifecycle migration (dashed) + label = "migrate" if e is None else "migrate · " + edge_label(e).replace("\\n", " · ") + out.append(' %s -. "%s" .-> %s' % (nid(src), label, nid(dst))) for name, a in sorted(nodes.items()): out.append(" style %s fill:%s" % (nid(name), TIER_COLOR.get(a["tier"], "#cfd8dc"))) for idx in red_idx: diff --git a/docs/operation/fodc/topology.md b/docs/operation/fodc/topology.md index c5132043a..b96589ebd 100644 --- a/docs/operation/fodc/topology.md +++ b/docs/operation/fodc/topology.md @@ -2,13 +2,13 @@ BanyanDB gives you two partial views of a running cluster, and neither is a usable topology on its own: -- The **FODC proxy `/cluster/topology`** endpoint knows the **nodes** — names, roles, storage tier, health — and a `calls` graph that carries the **lifecycle tier-migration** edges (hot→warm→cold) mixed with a noisy data↔data property-repair gossip mesh. -- The **queue metrics** (`banyandb_queue_pub_*` / `banyandb_queue_sub_*`) know the **request pipeline edges** — which node sends what to which, for what operation/group, at what throughput, latency, and error rate — but carry only the peer's node name and the local scrape target's identity, with no node inventory or health. +- The **FODC proxy `/cluster/topology`** endpoint knows the **nodes** — names, roles, storage tier, health — and a `calls` graph that carries the **lifecycle tier-migration** edges (hot→warm→cold) mixed with the liaison's route-table edges and a noisy data↔data property-repair gossip mesh. +- The **queue metrics** (`banyandb_queue_pub_*` / `banyandb_queue_sub_*`, plus the tier-migration mirror `banyandb_lifecycle_migration_*`) know the **flow edges** — which node sends what to which, for what operation/group, at what throughput, latency, and error rate — but carry only the peer's node name and the local scrape target's identity, with no node inventory or health. This page shows how to **join them on node name** into one directed topology with **two edge layers**: -- **Request pipeline** (solid, weighted) — liaison→data, from the queue metrics: `batch-write`, `file-sync`, `query`, and `control`, with throughput, p99 latency, errors, and file-sync bytes. The publisher metrics (`queue_pub`) are the primary source; the subscriber metrics (`queue_sub`) fill any edge the publisher does not record — chiefly the liaison→warm/cold `query` fan-out on servers whose publish path is not yet instrumented for it. -- **Lifecycle migration** (dashed, structural) — hot→warm→cold tier movement, from the **lifecycle** service's entries in `/cluster/topology` `calls`. +- **Request pipeline** (solid, weighted) — liaison→data, from the queue metrics: `batch-write`, `file-sync`, `query`, and `control`, with throughput, p99 latency, errors, and file-sync bytes. The publisher metrics (`queue_pub`) are the primary source and record all four operations on current servers; the subscriber metrics (`queue_sub`) fill any edge the publisher does not record — chiefly the liaison→warm/cold `query` fan-out on older servers whose publish path predates the `query`/`con [...] +- **Lifecycle migration** (dashed) — hot→warm→cold tier movement. The **path** comes from the **lifecycle** service's entries in `/cluster/topology` `calls` (structural — present even between scheduled runs); the **weight** comes from the `banyandb_lifecycle_migration_*` family, the tier-migration mirror of `queue_pub`. Nodes (and their health) come from `/cluster/topology`. The result answers "who sends what to whom, how fast, and is it healthy?" — useful for dashboards and incident triage. @@ -34,7 +34,7 @@ Returns `{ "nodes": [...], "calls": [...] }`. Each node carries the identity and - `labels.type` is the storage tier (`hot` / `warm` / `cold`; empty for liaison). - `labels.container_name` is the k8s container the node runs as (`liaison` / `data` / `lifecycle`), stamped by the agent from its `--container-names` config — so even the role-less lifecycle sidecar is classified correctly. - `labels.pod_name`, `status`, and `last_heartbeat` are enrichment the proxy/agent fill in for every node matched to a live agent — see [Caveats](#caveats). -- `calls` mixes two things: the **lifecycle service's** tier-migration targets (hot→warm→cold — we render these, see [Lifecycle migration layer](#lifecycle-migration-layer)) and a data↔data property-repair gossip mesh (we drop these as noise). +- `calls` mixes three things: the **lifecycle service's** tier-migration targets (hot→warm→cold — we render these, see [Lifecycle migration layer](#lifecycle-migration-layer)), the **liaison's** route-table edges (dropped — the request layer is rendered from the weighted queue metrics instead), and a data↔data property-repair gossip mesh (dropped as noise). ### Edges — queue metrics on the FODC proxy scrape @@ -48,14 +48,16 @@ Every `banyandb_queue_pub_*` series carries **both endpoints** of one directed f `remote_node` is exactly a topology `metadata.name`, so a publisher series is a ready-made directed edge **`pod_name` → `remote_node`**. -`banyandb_queue_sub_*` on the receiver carries the **same five labels**, but recorded from the other end: `pod_name` is the **receiver** and `remote_node` is the **sender** (the publisher stamps its identity onto the first frame of each stream, and the receiver reads it back). So a subscriber series is the directed edge **`remote_node` → `pod_name`** — the inverse mapping. We use the publisher as the primary source and the subscriber **only to fill edges the publisher does not record**, [...] +`banyandb_queue_sub_*` on the receiver carries the **same five labels**, but recorded from the other end: `pod_name` is the **receiver** and `remote_node` is the **sender** (the publisher stamps its identity onto the first frame of each stream, and the receiver reads it back). So a subscriber series is the directed edge **`remote_node` → `pod_name`** — the inverse mapping. We use the publisher as the primary source and the subscriber **only to fill edges the publisher does not record**, [...] + +One class of subscriber series belongs to a different layer entirely: series with **`remote_role="lifecycle"`** are inbound **tier-migration** traffic from the lifecycle service (which stamps its co-located data node's name and tier as its identity). Keep them **out of the request-pipeline fallback** — otherwise migration file-sync shows up as a bogus data→data request edge — and use them, if at all, only as a receiver-side cross-check of the migration layer below. ## The join 1. **Remote endpoint** — `remote_node` **==** topology `metadata.name`. Direct, exact. 2. **Local endpoint** — resolve the metric's `pod_name` to a topology node via `labels.pod_name` when present; otherwise keep the `pod_name` as the node id and take its role/tier from the metric's own `node_role` / `node_type`. Both liaisons (publishers) and data nodes (subscribers) carry `labels.pod_name`, so both ends resolve cleanly. **Lifecycle sidecars share a pod — and thus a `pod_name` — with their co-located data node**, so they are excluded from the `pod_name` → node map; otherw [...] 3. **Direction follows the recording side.** A publisher series (`queue_pub`) is `local → remote_node` (the scrape target is the sender). A subscriber series (`queue_sub`) is the **inverse** — `remote_node → local` (the scrape target is the receiver; `remote_node` is the sender). After flipping the subscriber edges they share the same node-name keyspace as the publisher edges. -4. **Publisher first, subscriber as fallback — per edge.** Build the edge set from `queue_pub`; then add an edge from `queue_sub` **only if the publisher did not already record it**. Never sum both sides for one edge. On servers that don't yet record `query`/`control` on the publish path, this is what surfaces the liaison→warm/cold `query` edges; once the publisher records them too, those edges come from `queue_pub` and the subscriber contributes nothing for them (so there is still no do [...] +4. **Publisher first, subscriber as fallback — per edge.** Build the edge set from `queue_pub`; then add an edge from `queue_sub` **only if the publisher did not already record it**, and **skip subscriber series with `remote_role="lifecycle"`** (migration traffic — it belongs to the migration layer, not the request pipeline). Never sum both sides for one edge. On older servers that don't record `query`/`control` on the publish path, the fallback is what surfaces the liaison→warm/cold `qu [...] 5. **Node attributes come from `/cluster/topology`** (authoritative, consistent), falling back to the metric's `node_*` / `remote_*` labels for endpoints the topology did not enrich. The node set is the **union** of topology nodes and the endpoints seen in the metrics, so idle nodes (no current traffic) still appear, and a live edge to a node missing from topology still renders. @@ -98,7 +100,20 @@ sum by (pod_name, remote_node) Add `, group` to any `by (...)` clause to break an edge down by business group. -Run the **same four queries** against `banyandb_queue_sub_*` to get the subscriber view (e.g. `rate(banyandb_queue_sub_total_finished{...}[$__rate_interval])`). Remember the subscriber edge is inverted — `remote_node` is the **source** and `pod_name` is the **target** — and that `banyandb_queue_sub_*` records `received_bytes`, not `sent_bytes`. Merge the two sets per edge, preferring the publisher, so the liaison→warm/cold `query` edges fill in from the subscriber while every publisher e [...] +Run the **same four queries** against `banyandb_queue_sub_*` to get the subscriber view (e.g. `rate(banyandb_queue_sub_total_finished{...}[$__rate_interval])`), adding `remote_role!="lifecycle"` to the selector so inbound migration traffic stays out of the request layer. Remember the subscriber edge is inverted — `remote_node` is the **source** and `pod_name` is the **target** — and that `banyandb_queue_sub_*` records `received_bytes`, not `sent_bytes`. Merge the two sets per edge, prefe [...] + +And the **migration layer** runs the same shapes against `banyandb_lifecycle_migration_*` — the tier-migration mirror of `queue_pub`, emitted by the lifecycle sidecar (`container_name="lifecycle"`, `pod_name` shared with its co-located data node): + +```promql +# Migration throughput (messages/s), per edge and operation +sum by (pod_name, remote_node, remote_role, remote_tier, operation) + (rate(banyandb_lifecycle_migration_total_finished{job=~"$job"}[$__rate_interval])) + +# Migration p99 / errors / bytes — same shapes as above, on +# banyandb_lifecycle_migration_total_latency_bucket / _total_err / _sent_bytes +``` + +These are publisher-side series, so the edge is `pod_name → remote_node` — but resolve the local `pod_name` to the pod's **`:17914` lifecycle node** (the series carry `container_name="lifecycle"`), not its data node. `operation` is `file-sync` for part shipping and `batch-write` for row replay. ## Lifecycle migration layer @@ -106,7 +121,7 @@ The tiered-storage **lifecycle** service migrates data hot→warm→cold. It is - **Source = the lifecycle service's route table**, recomputed continuously from each group's lifecycle **stage node-selectors** — not only while the scheduled (e.g. `@daily`) migration runs. Each lifecycle instance runs on a data pod and publishes the **next-tier data nodes it migrates to**; the proxy exposes these as `calls` whose `source` is the lifecycle node — a hot-tier instance to warm nodes, a warm-tier instance to cold nodes, i.e. the hot→warm→cold path. - **Identify lifecycle nodes by their port.** Lifecycle nodes advertise the lifecycle gRPC port (default **`17914`**) and carry no role. The script treats any `calls` edge whose `source` ends in `:17914` as a migration edge and drops the rest of the `calls` mesh (property-repair gossip). -- **Structural only — no metrics.** The migration publisher is built without a metadata service (`pub.NewWithoutMetadata()`), so it emits **no `queue_pub` metrics**. Migration edges have no throughput/latency/bytes — they show *that* a tier path exists, not how busy it is. (The request pipeline is the weighted layer.) +- **Structural path + metric weight.** The migration publisher is built without a metadata service (`pub.NewWithoutMetadata()`), so it emits no `queue_pub` metrics — instead the same five instruments are registered under **`banyandb_lifecycle_migration_*`** (same labels; `operation` is `file-sync` for part shipping, `batch-write` for row replay; see [Metrics](../observability/metrics.md#lifecycle_migration--the-tier-migration-mirror-of-queue_pub)). The `calls` route table remains the **s [...] - **Distinct identity, all-interfaces bind.** Each lifecycle instance advertises `<pod-host>:17914` as its node name (resolved from the node host when `--lifecycle-grpc-host` is empty), so the per-tier instances stay distinct instead of collapsing into a single `:17914` node under the proxy's dedup-by-name. The gRPC server still **binds** to `:17914` (all interfaces) so the co-located FODC agent reaches it on `127.0.0.1`; only the advertised identity carries the host. ## Rendering recipe (offline join script) @@ -114,9 +129,9 @@ The tiered-storage **lifecycle** service migrates data hot→warm→cold. It is The script [`render_topology.py`](./render_topology.py) does the whole join with only the Python standard library: - GETs `{proxy}/cluster/topology` and builds the node inventory (`metadata.name` → role / tier / pod / status, plus a `pod_name` → name map that excludes lifecycle sidecars so they don't shadow their co-located data node). -- Runs the four per-edge queries against Prometheus for **both** `banyandb_queue_pub_*` and `banyandb_queue_sub_*`. -- Joins them — `remote_node` maps to a topology node directly, the local `pod_name` maps to a node via `labels.pod_name` (falling back to the metric's own `node_*` labels). Publisher edges are `pod_name → remote_node`; subscriber edges are flipped to `remote_node → pod_name`. It accumulates per-edge throughput, p99, errors, and bytes from the publisher, then adds any edge the publisher lacks from the subscriber (so liaison→warm/cold `query` edges appear without double-counting). -- Adds the **lifecycle migration layer**: `calls` edges whose `source` is a lifecycle node (`--lifecycle-port`, default `17914`) become dashed `migrate` edges (hot→warm→cold); the rest of the `calls` gossip mesh is dropped. +- Runs the four per-edge queries against Prometheus for **both** `banyandb_queue_pub_*` and `banyandb_queue_sub_*`, plus the migration family `banyandb_lifecycle_migration_*`. +- Joins them — `remote_node` maps to a topology node directly, the local `pod_name` maps to a node via `labels.pod_name` (falling back to the metric's own `node_*` labels). Publisher edges are `pod_name → remote_node`; subscriber edges are flipped to `remote_node → pod_name`. It accumulates per-edge throughput, p99, errors, and bytes from the publisher, then adds any edge the publisher lacks from the subscriber (so on older servers the liaison→warm/cold `query` edges appear without doubl [...] +- Adds the **lifecycle migration layer**: `calls` edges whose `source` is a lifecycle node (`--lifecycle-port`, default `17914`) become dashed `migrate` edges (hot→warm→cold); the rest of the `calls` mesh is dropped. Each migration edge is **weighted from `banyandb_lifecycle_migration_*`** when those series exist (the local `pod_name` resolves to the pod's `:17914` lifecycle node), and stays a bare structural `migrate` edge otherwise — e.g. between scheduled runs on a fresh cluster, or o [...] - Prints **Graphviz DOT** and/or **Mermaid**: nodes shaped by role (liaison = box, data = cylinder, lifecycle = ellipse), colored by tier, dashed-red when unhealthy. Write edges are labeled with operation / throughput / p99 / bytes and turn red on errors or when p99 exceeds `--p99-warn`; migration edges are dashed. If Prometheus sits behind Grafana's datasource proxy with basic auth, set `PROM_USER` / `PROM_PASS` in the environment. @@ -168,7 +183,7 @@ Two things to notice, both of which drive the design: The liaison→warm/cold `query` edges are now plain publisher series. The **subscriber** side (`queue_sub`) mirrors them but contributes nothing here — per-edge priority keeps every edge publisher-sourced, so there is no double-counting. On a cluster whose publisher predates the `query`/`control` instrumentation, the script would instead fill these edges from `queue_sub`, **inverted** (`remote_node`→`pod_name`). -Lifecycle migration (from `calls`, structural — no metrics): +Lifecycle migration (path from `calls`; this capture predates the `banyandb_lifecycle_migration_*` family, so the edges render structural — unweighted): | lifecycle source (`…:17914`) | → targets (next tier) | | --- | --- | @@ -249,11 +264,12 @@ graph LR style lc_warm_1 fill:#ffb74d ``` -**Two layers.** Solid edges are the **request pipeline** — `batch-write`, `file-sync`, and `query`/`control` (including the liaison→warm/cold query fan-out), all from the publisher metrics, weighted by operation, throughput, p99, and file-sync bytes; red on errors or when p99 exceeds `--p99-warn`. Dashed `migrate` edges are the **lifecycle tier migration** (hot→warm→cold, from `calls`; structural, no metrics). Liaisons are boxes, data nodes cylinders colored by tier, lifecycle sidecars e [...] +**Two layers.** Solid edges are the **request pipeline** — `batch-write`, `file-sync`, and `query`/`control` (including the liaison→warm/cold query fan-out), all from the publisher metrics, weighted by operation, throughput, p99, and file-sync bytes; red on errors or when p99 exceeds `--p99-warn`. Dashed `migrate` edges are the **lifecycle tier migration** (hot→warm→cold, path from `calls`; on a current build they additionally carry the `banyandb_lifecycle_migration_*` rates after a migr [...] ## Caveats -- **Publisher first, subscriber fallback — per edge.** `queue_pub` is the primary source. `queue_sub` mirrors each edge from the receiver (inverted: `remote_node` is the sender); add it **only** for edges the publisher does not record, and never sum both sides for one edge, or you double-count throughput. The fallback exists because `query`/`control` are recorded on the receiver until the publish path is instrumented for them. +- **Publisher first, subscriber fallback — per edge.** `queue_pub` is the primary source. `queue_sub` mirrors each edge from the receiver (inverted: `remote_node` is the sender); add it **only** for edges the publisher does not record, and never sum both sides for one edge, or you double-count throughput. The fallback exists for older servers that record `query`/`control` only on the receiver; current publishers record all four operations. +- **Migration traffic appears in `queue_sub` with `remote_role="lifecycle"`.** The lifecycle publisher stamps its co-located data node's name and tier as its identity, so on the receiving node those series name a *data* node as the sender. Exclude them from the request-pipeline fallback (`remote_role!="lifecycle"`), or migration file-sync renders as a bogus data→data request edge; the migration layer itself is weighted from the publisher-side `banyandb_lifecycle_migration_*` family. - **Subscriber edges are inverted.** On a `queue_sub` series the scrape target (`pod_name`) is the **receiver** and `remote_node` is the **sender**, so the edge is `remote_node → pod_name` — the opposite of the publisher mapping. Flip it before joining. - **Lifecycle sidecars share a `pod_name`** with their co-located data node; exclude them from the `pod_name` → node map, or subscriber edges to the data node get misattributed to the `:17914` sidecar. - **Node name is a full DNS address** (`pod.headless.namespace:port`), and liaison/data use different ports — match on the whole string, not a prefix. @@ -262,7 +278,7 @@ graph LR - **`*_total_err` is lazily registered** — absent means "no errors yet", not zero; treat a missing series as healthy. - **Use `rate(...[window])`, never raw counters.** After a restart, old counter series linger in Prometheus until they age out; a rate window ignores them. - **`control` / `query` edges carry little or no bytes** (`sent_bytes` is file-sync only; `queue_sub` records `received_bytes`), so those edges show throughput/latency without a byte figure. -- **`calls` carries two edge kinds.** Lifecycle-source edges (`…:17914`) are the tier migration we render (dashed); data-source edges (`…:17912`) are the property-repair gossip mesh we drop. The liaison→data request pipeline is **not** in `calls` — that comes from the queue metrics. Each lifecycle instance advertises a distinct `<pod-host>:17914`, so the per-tier migration path renders without collapsing; see [Lifecycle migration layer](#lifecycle-migration-layer). +- **`calls` carries three edge kinds.** Lifecycle-source edges (`…:17914`) are the tier migration we render (dashed); liaison-source edges (`…:18912`) are the liaison's route tables, dropped in favor of the weighted queue metrics; data-source edges (`…:17912`) are the property-repair gossip mesh, dropped as noise. Each lifecycle instance advertises a distinct `<pod-host>:17914`, so the per-tier migration path renders without collapsing; see [Lifecycle migration layer](#lifecycle-migratio [...] ## Related diff --git a/docs/operation/observability/metrics.md b/docs/operation/observability/metrics.md index 26f07a3d6..e298d7b7c 100644 --- a/docs/operation/observability/metrics.md +++ b/docs/operation/observability/metrics.md @@ -340,7 +340,7 @@ Each panel uses the same three expressions, with `operation` pinned to the panel Liaison nodes run an internal gRPC **queue server** (`server-queue-sub`, wired via `sub.NewServerWithPorts` in `pkg/cmdsetup/liaison.go`) and **queue clients** (`server-queue-pub`) for the tier-1/tier-2 pipelines. Prometheus metrics use the namespaces `banyandb_queue_sub_*` and `banyandb_queue_pub_*` (built from `observability.RootScope` + `queue_sub` / `queue_pub` sub-scopes). Data nodes expose the same families where the corresponding services run. -Both namespaces share one model: the base metrics `total_started`, `total_finished`, `total_latency` (a histogram), and `total_err`, labeled by `operation` (`batch-write` / `file-sync` / `query` / `control`) and `group`, plus the **remote endpoint** of the flow — `remote_node` (the peer's BanyanDB node name, equal to its `/cluster/topology` `metadata.name`), `remote_role` (`liaison` / `data`), and `remote_tier` (`hot` / `warm` / `cold`, data only). `total_err` adds an `error_type` label. [...] +Both namespaces share one model: the base metrics `total_started`, `total_finished`, `total_latency` (a histogram), and `total_err`, labeled by `operation` (`batch-write` / `file-sync` / `query` / `control`) and `group`, plus the **remote endpoint** of the flow — `remote_node` (the peer's BanyanDB node name, equal to its `/cluster/topology` `metadata.name`), `remote_role` (`liaison` / `data`, plus `lifecycle` on `queue_sub` for inbound tier-migration traffic — see the [lifecycle migratio [...] ### `queue_sub` — inbound server @@ -351,7 +351,7 @@ Both namespaces share one model: the base metrics `total_started`, `total_finish | `total_err` | Counter | …, `error_type` | Errors by type. Lazily registered, so absent (not zero) on a healthy cluster. | | `received_bytes` | Counter | `operation`, `group`, `remote_node`, `remote_role`, `remote_tier` | Bytes received, **file-sync only** (`operation="file-sync"`). | -**Troubleshooting:** a growing `total_started − total_finished` gap (or rising `total_latency` p99) for a `group`/`operation` points at slow or stuck consumers. `total_err` broken down by `error_type` distinguishes transport issues (`stream_error`, `recv_error`, `checksum_mismatch`, `out_of_order`) from completion issues (`finish_sync_err`, `part_failed`). For file-sync, `received_bytes` together with `total_latency` separates partial completion from healthy throughput. +**Troubleshooting:** a growing `total_started − total_finished` gap (or rising `total_latency` p99) for a `group`/`operation` points at slow or stuck consumers. `total_err` broken down by `error_type` distinguishes transport issues (`stream_error`, `recv_error`, `checksum_mismatch`, `out_of_order`) from completion issues (`finish_sync_err`, `part_failed`). For file-sync, `received_bytes` together with `total_latency` separates partial completion from healthy throughput. On data nodes, se [...] ### `queue_pub` — outbound batch client @@ -359,12 +359,34 @@ Both namespaces share one model: the base metrics `total_started`, `total_finish | --- | --- | --- | --- | | `total_started`, `total_finished` | Counter | `operation`, `group`, `remote_node`, `remote_role`, `remote_tier` | Sends started / finished per operation and target node; `total_finished` is the success rate. | | `total_latency` | Histogram | `operation`, `group`, `remote_node`, `remote_role`, `remote_tier` | Send latency (`_bucket` / `_sum` / `_count`); use `histogram_quantile` for p99. | -| `total_err` | Counter | …, `error_type` | Send errors by type. `error_type` is one of `non_transient`, `canceled`, `stream_canceled`, `retry_exhausted`, `recv_error`, `server_rejected` (Send path) or `stream_error`, `recv_error`, `checksum_mismatch`, `out_of_order`, `session_not_found`, `completion_error` (file-sync). Lazily registered. | +| `total_err` | Counter | …, `error_type` | Send errors by type. `error_type` is one of `non_transient`, `canceled`, `stream_canceled`, `retry_exhausted`, `recv_error`, `server_rejected`, `send_error`, `decode_error`, `invalid_topic` (Send/publish path) or `stream_error`, `recv_error`, `checksum_mismatch`, `out_of_order`, `session_not_found`, `completion_error` (file-sync). Lazily registered. | | `sent_bytes` | Counter | `operation`, `group`, `remote_node`, `remote_role`, `remote_tier` | Bytes sent, **file-sync only**. | **Troubleshooting:** `total_err` by `error_type` separates transport failures (`recv_error`) from application-level `SendResponse` errors (`server_rejected`) and exhausted retries (`retry_exhausted`). A persistent `total_started − total_finished` gap, or rising `total_latency` p99 for a given `remote_node` / `remote_tier`, indicates slow or unavailable data nodes. -Metrics are only registered when `metadata` implements `metadata.Service` and `MetricsRegistry()` is non-nil (e.g. after `SetMetricsRegistry` in bootstrap). `NewWithoutMetadata()` leaves `queue_pub` metrics disabled and logs a warning (`queue_pub metrics disabled: ...`). The `total_err` counters above are registered lazily on first occurrence, so they are simply absent (not zero) on a healthy cluster. +Metrics are only registered when `metadata` implements `metadata.Service` and `MetricsRegistry()` is non-nil (e.g. after `SetMetricsRegistry` in bootstrap). `NewWithoutMetadata(omr)` — used by the lifecycle service's tier-migration publisher — leaves the regular `queue_pub` family disabled and registers the same instrument set under `banyandb_lifecycle_migration_*` instead (see [below](#lifecycle_migration--the-tier-migration-mirror-of-queue_pub)). The `total_err` counters above are regi [...] + +### `lifecycle_migration` — the tier-migration mirror of `queue_pub` + +The **lifecycle** service migrates data hot→warm→cold through a queue client built without a metadata service (`pub.NewWithoutMetadata`), so its traffic does **not** appear under `banyandb_queue_pub_*`. Instead the same five instruments — `total_started`, `total_finished`, `total_latency`, `total_err`, `sent_bytes` — are registered under **`banyandb_lifecycle_migration_*`**, with the same labels (`operation`, `group`, `remote_node`, `remote_role`, `remote_tier`, plus `error_type` on `tot [...] + +The lifecycle sidecar serves these series (plus the run-health gauges below) at `/metrics` on its own HTTP port (`--lifecycle-http-port`, default `17915`) instead of the `2121` observability listener; the co-located FODC agent polls that port, so the series arrive through the proxy scrape with `container_name="lifecycle"` and the `pod_name` shared with the co-located data node. On the **receiving** data node the same flows are mirrored as `banyandb_queue_sub_*` series with `remote_role=" [...] + +**Example:** migration throughput per target node — `sum(rate(banyandb_lifecycle_migration_total_finished{job=~"$job"}[$__rate_interval])) by (operation, remote_node)`; errors — `sum(rate(banyandb_lifecycle_migration_total_err{job=~"$job"}[$__rate_interval])) by (error_type)`. + +For the worked recipe that turns this family into weighted hot→warm→cold edges in a topology graph, see [Cluster Topology Rendering](../fodc/topology.md#lifecycle-migration-layer). + +### Lifecycle run health (`banyandb_lifecycle_*`) + +The lifecycle scheduler also exposes three service-level series: + +| Metric | Type | Meaning | +| --- | --- | --- | +| `banyandb_lifecycle_cycles_total` | Counter | Migration cycles executed by the scheduler. | +| `banyandb_lifecycle_last_run_timestamp_seconds` | Gauge | Unix time at which the most recent migration run **started**; stamped on every return path (success, error, recovered panic). | +| `banyandb_lifecycle_last_run_success` | Gauge | Outcome of the most recent run: `1` success, `0` failure. | + +**Suggested alerts:** `banyandb_lifecycle_last_run_success == 0`, and `time() - banyandb_lifecycle_last_run_timestamp_seconds` exceeding the migration schedule interval (a missed run). ### Example PromQL snippets diff --git a/docs/operation/observability/overview.md b/docs/operation/observability/overview.md index a0635f83d..aafc4af0d 100644 --- a/docs/operation/observability/overview.md +++ b/docs/operation/observability/overview.md @@ -26,7 +26,8 @@ If you watch nothing else, watch these. They are organized with the two canonica | 8 | **Merge / compaction health** (LSM) | Merge File Rate/Latency/Partitions (`banyandb_*_total_merge_loop_started`, `_merge_latency`, `_merged_parts`), part counts (`*_total_file_parts`) | merge-latency spikes correlated with write-latency spikes; steadily growing part/partition counts | a compaction storm or backlog → write-latency spikes, query slowdown, oversized/broken parts [6] | | 9 | **Cardinality / series growth** | Total Series (`banyandb_*_inverted_index_total_doc_count`), `_total_updates` (churn), `_total_term_searchers_started` | rapid `doc_count` growth, high churn, or high term-search rate | a cardinality explosion → memory pressure, inverted-index bloat, and slow queries | | 10 | **Node liveness / membership** | Active Instances (`count(banyandb_system_up_time)` by `container_name`), per-node `banyandb_system_up_time` | reporting-node count below expected; a node's uptime dropping to ~0 (restart) or its series disappearing (gone) | lost capacity, under-replication, and query gaps | +| 11 | **Lifecycle migration health** (tiered storage) | `banyandb_lifecycle_last_run_success` (1/0), `banyandb_lifecycle_last_run_timestamp_seconds`, migration errors `banyandb_lifecycle_migration_total_err` | `last_run_success == 0`, or `time() - last_run_timestamp_seconds` exceeding the migration schedule interval | failed or missed hot→warm→cold migration — data piling up on the hot tier until its disk fills (signal #4) | -**Priority order:** 1–3 (RED) tell you whether users are affected *right now*; 4–7 (USE/backpressure) are the **leading indicators** that catch trouble before it becomes user-visible; 8–10 catch the classic slow-burn database failures (compaction backlog, cardinality blow-up, node loss). A practical first alert set: query p99 latency, error rate, disk > 85%, memory near `--allowed-percent`, and sustained wqueue/`queue_pub` backlog. +**Priority order:** 1–3 (RED) tell you whether users are affected *right now*; 4–7 (USE/backpressure) are the **leading indicators** that catch trouble before it becomes user-visible; 8–11 catch the classic slow-burn database failures (compaction backlog, cardinality blow-up, node loss, stalled tier migration). A practical first alert set: query p99 latency, error rate, disk > 85%, memory near `--allowed-percent`, and sustained wqueue/`queue_pub` backlog. > Sources: [1] Google SRE — *Monitoring Distributed Systems* (Four Golden > Signals) <https://sre.google/sre-book/monitoring-distributed-systems/> · [2] > B. Gregg — *The USE Method* <https://www.brendangregg.com/usemethod.html> · > [3] *The RED Method* (Grafana / T. Wilkie) > <https://grafana.com/blog/the-red-method-how-to-instrument-your-services/> · > [4] Elasticsearch disk watermarks 85/90/95% > <https://www.elastic.co/docs/troubleshoot/elasticsearch/fix-watermark-errors> > · [5] Prometheus remote [...] diff --git a/docs/operation/observability/providers.md b/docs/operation/observability/providers.md index 523056c11..df1d71031 100644 --- a/docs/operation/observability/providers.md +++ b/docs/operation/observability/providers.md @@ -6,7 +6,7 @@ BanyanDB has built-in support for metrics collection. Currently, there are two s Prometheus is auto enabled at run time, if no flag is passed or if `prometheus` is set in `observability-modes` flag. -When the Prometheus metrics provider is enabled, each BanyanDB process exposes its own metrics on port `2121`. +When the Prometheus metrics provider is enabled, each BanyanDB process exposes its own metrics on port `2121`. (The lifecycle sidecar is the exception: it reuses its HTTP API port — `--lifecycle-http-port`, default `17915` — and serves the same Prometheus payload at `/metrics` there instead of opening `2121`.) In a cluster, the recommended setup is to let the **FODC proxy** aggregate every node's metrics and scrape the proxy as the single target. Configure the following Prometheus job (replace `BANYANDB_NAMESPACE` with the namespace where BanyanDB is deployed, and adjust the keep-regex to match your FODC proxy pod label):
