This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 08566550b feat(queue): redesign pub/sub metrics around operation, 
group, and node topology (#1161)
08566550b is described below

commit 08566550bbb38c228eb7040d5087a61360d6f4be
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Jun 6 16:25:34 2026 +0800

    feat(queue): redesign pub/sub metrics around operation, group, and node 
topology (#1161)
    
    * feat(queue): redesign pub/sub metrics around operation, group, and node 
topology
    
    Collapse the queue_pub/queue_sub metric sprawl into a uniform base set —
    total_started, total_finished, total_latency (now a histogram) and 
total_err,
    plus file-sync-only sent_bytes (pub) / received_bytes (sub) — and re-label 
them
    by operation (batch-write/file-sync/query/control), group, and the remote
    endpoint (remote_node/remote_role/remote_tier). remote_node equals the 
BanyanDB
    node metadata.name, so the liaison<->data (hot/warm/cold) call graph can be 
drawn
    by joining /cluster/topology with the metrics.
    
    The sender's identity rides the wire via new cluster.v1.SendRequest fields
    (group, sender_node/sender_role/sender_tier, stamped on the first frame of a
    stream) and SyncMetadata (sender_*), so receivers can label flows by sender;
    pub-side remote_role/remote_tier are resolved from the connection registry.
    
    BREAKING CHANGE: removes the previous queue_* metric and label names
    (*_total_msg_*, queue_pub_send_*, inflight/retry/backoff gauges,
    chunked_sync_*/chunk_reorder_*, and the topic label); update 
dashboards/alerts.
    
    Signed-off-by: Hongtao Gao <[email protected]>
    
    
    
    ---------
    
    Signed-off-by: Hongtao Gao <[email protected]>
---
 CHANGES.md                               |   1 +
 api/data/data.go                         | 114 ++++++++++
 api/proto/banyandb/cluster/v1/rpc.proto  |  14 ++
 banyand/queue/pub/batch.go               | 133 +++++++-----
 banyand/queue/pub/chunked_sync.go        |  93 ++++++--
 banyand/queue/pub/metrics_test.go        | 296 +++++++++-----------------
 banyand/queue/pub/pub.go                 | 136 +++++++++---
 banyand/queue/pub/pub_test.go            |   5 +
 banyand/queue/pub/retry_test.go          |  18 +-
 banyand/queue/sub/chunked_sync.go        | 217 +++++++++----------
 banyand/queue/sub/chunked_sync_test.go   | 145 ++++++-------
 banyand/queue/sub/helpers.go             |  41 ++--
 banyand/queue/sub/server.go              |  79 +------
 banyand/queue/sub/server_metrics_test.go | 249 ++++++++++++----------
 banyand/queue/sub/sub.go                 | 352 +++++++++++++++++++------------
 docs/api-reference.md                    |   7 +
 pkg/cmdsetup/liaison.go                  |   9 +
 17 files changed, 1063 insertions(+), 846 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 79c40d6a6..2f4c650bf 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,6 +5,7 @@ Release Notes.
 ## 0.11.0
 
 ### Features
+- Redesign the queue (`queue_pub`/`queue_sub`) metrics around a uniform model: 
keep only `total_started`, `total_finished`, `total_latency` (now a histogram) 
and `total_err`, plus file-sync-only `sent_bytes` (pub) / `received_bytes` 
(sub). Replace the `topic` label with `operation` 
(`batch-write`/`file-sync`/`query`/`control`) and `group`, add an `error_type` 
label on `total_err`, and add remote-endpoint labels 
(`remote_node`/`remote_role`/`remote_tier`) so the liaison↔data (hot/warm/col 
[...]
 - Vectorized measure query path is now enabled by default. The columnar 
pipeline replaces per-row protobuf serialization in `NewMIterator`, cutting 
allocations and ns/op for scan-heavy measure queries; gRPC wire format 
(`*measurev1.InternalDataPoint`) is byte-identical. Single-node coverage is 
complete: scan, GroupBy+Agg via `BatchAggregation`, scalar reduce (`Agg` 
without `GroupBy`), raw `GroupBy` (without `Agg`), implicit projection coverage 
for GroupBy/Agg fields, `TopN`/`BottomN`, `o [...]
 - Add validation to ensure Measure's ShardingKey contains all Entity tags to 
guarantee entity locality.
 - Organize access logs under a dedicated "accesslog" subdirectory to improve 
log organization and separation from other application data.
diff --git a/api/data/data.go b/api/data/data.go
index 822ea1401..b11baea5c 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -243,3 +243,117 @@ var (
        // TopicCommon is the common topic for data transmission.
        TopicCommon = bus.Topic{}
 )
+
+const (
+       // OperationFileSyncValue is the operation label for file-sync (chunked 
sync) operations.
+       OperationFileSyncValue = "file-sync"
+       // OperationBatchWriteValue is the operation label for batch-write 
operations.
+       OperationBatchWriteValue = "batch-write"
+       // OperationQueryValue is the operation label for query operations.
+       OperationQueryValue = "query"
+       // OperationControlValue is the operation label for control operations.
+       OperationControlValue = "control"
+)
+
+var operationMap = map[bus.Topic]string{
+       TopicStreamPartSync:            OperationFileSyncValue,
+       TopicMeasurePartSync:           OperationFileSyncValue,
+       TopicTracePartSync:             OperationFileSyncValue,
+       TopicStreamSeriesSync:          OperationFileSyncValue,
+       TopicMeasureSeriesSync:         OperationFileSyncValue,
+       TopicTraceSeriesSync:           OperationFileSyncValue,
+       TopicStreamElementIndexSync:    OperationFileSyncValue,
+       TopicStreamWrite:               OperationBatchWriteValue,
+       TopicMeasureWrite:              OperationBatchWriteValue,
+       TopicTraceWrite:                OperationBatchWriteValue,
+       TopicMeasureSeriesIndexInsert:  OperationBatchWriteValue,
+       TopicMeasureSeriesIndexUpdate:  OperationBatchWriteValue,
+       TopicStreamSeriesIndexWrite:    OperationBatchWriteValue,
+       TopicStreamLocalIndexWrite:     OperationBatchWriteValue,
+       TopicTraceSidxSeriesWrite:      OperationBatchWriteValue,
+       TopicStreamQuery:               OperationQueryValue,
+       TopicMeasureQuery:              OperationQueryValue,
+       TopicInternalMeasureQuery:      OperationQueryValue,
+       TopicTopNQuery:                 OperationQueryValue,
+       TopicTraceQuery:                OperationQueryValue,
+       TopicPropertyUpdate:            OperationControlValue,
+       TopicPropertyDelete:            OperationControlValue,
+       TopicPropertyQuery:             OperationControlValue,
+       TopicPropertyRepair:            OperationControlValue,
+       TopicMeasureCollectDataInfo:    OperationControlValue,
+       TopicMeasureCollectLiaisonInfo: OperationControlValue,
+       TopicStreamCollectDataInfo:     OperationControlValue,
+       TopicStreamCollectLiaisonInfo:  OperationControlValue,
+       TopicTraceCollectDataInfo:      OperationControlValue,
+       TopicTraceCollectLiaisonInfo:   OperationControlValue,
+       TopicMeasureDropGroup:          OperationControlValue,
+       TopicStreamDropGroup:           OperationControlValue,
+       TopicTraceDropGroup:            OperationControlValue,
+}
+
+// OperationOf returns the operation label string for the given bus topic.
+func OperationOf(topic bus.Topic) string {
+       if op, ok := operationMap[topic]; ok {
+               return op
+       }
+       return OperationControlValue
+}
+
+// GroupFromMessageData extracts the business group string from the 
pre-marshal request proto (pub side).
+func GroupFromMessageData(_ bus.Topic, payload any) string {
+       switch v := payload.(type) {
+       case *measurev1.InternalWriteRequest:
+               return groupFromWriteMetadata(v.GetRequest().GetMetadata())
+       case *streamv1.InternalWriteRequest:
+               return groupFromWriteMetadata(v.GetRequest().GetMetadata())
+       case *tracev1.InternalWriteRequest:
+               return groupFromWriteMetadata(v.GetRequest().GetMetadata())
+       case *measurev1.QueryRequest:
+               return firstGroup(v.GetGroups())
+       case *measurev1.InternalQueryRequest:
+               return firstGroup(v.GetRequest().GetGroups())
+       case *measurev1.TopNRequest:
+               return firstGroup(v.GetGroups())
+       case *streamv1.QueryRequest:
+               return firstGroup(v.GetGroups())
+       case *tracev1.QueryRequest:
+               return firstGroup(v.GetGroups())
+       case *propertyv1.InternalUpdateRequest:
+               return groupFromProperty(v.GetProperty())
+       case *propertyv1.InternalRepairRequest:
+               return groupFromProperty(v.GetProperty())
+       case *propertyv1.QueryRequest:
+               return firstGroup(v.GetGroups())
+       case *databasev1.GroupRegistryServiceDeleteRequest:
+               return v.GetGroup()
+       }
+       return ""
+}
+
+type groupGetter interface {
+       GetGroup() string
+}
+
+func groupFromWriteMetadata(md groupGetter) string {
+       if md == nil {
+               return ""
+       }
+       return md.GetGroup()
+}
+
+func groupFromProperty(p *propertyv1.Property) string {
+       if p == nil {
+               return ""
+       }
+       if md := p.GetMetadata(); md != nil {
+               return md.GetGroup()
+       }
+       return ""
+}
+
+func firstGroup(groups []string) string {
+       if len(groups) > 0 {
+               return groups[0]
+       }
+       return ""
+}
diff --git a/api/proto/banyandb/cluster/v1/rpc.proto 
b/api/proto/banyandb/cluster/v1/rpc.proto
index f71d89bcb..aec1b9963 100644
--- a/api/proto/banyandb/cluster/v1/rpc.proto
+++ b/api/proto/banyandb/cluster/v1/rpc.proto
@@ -54,6 +54,14 @@ message SendRequest {
   bool batch_mod = 4;
   // version_info contains version information
   VersionInfo version_info = 5;
+  // group is the business group associated with this message.
+  string group = 6;
+  // sender_node is the BanyanDB node name of the sender; set only on the 
first message of a stream.
+  string sender_node = 7;
+  // sender_role is the role of the sender node; set only on the first message 
of a stream.
+  string sender_role = 8;
+  // sender_tier is the storage tier label of the sender node; set only on the 
first message of a stream.
+  string sender_tier = 9;
 }
 
 message SendResponse {
@@ -121,6 +129,12 @@ message SyncMetadata {
   string topic = 4; // Sync topic (stream-part-sync or measure-part-sync).
   int64 timestamp = 6; // Timestamp when sync started.
   uint32 total_parts = 7; // Total number of parts being synced.
+  // sender_node is the BanyanDB node name of the sender.
+  string sender_node = 8;
+  // sender_role is the role of the sender node.
+  string sender_role = 9;
+  // sender_tier is the storage tier label of the sender node.
+  string sender_tier = 10;
 }
 
 // SyncCompletion contains completion information for the sync operation.
diff --git a/banyand/queue/pub/batch.go b/banyand/queue/pub/batch.go
index b183663f5..7ca830b92 100644
--- a/banyand/queue/pub/batch.go
+++ b/banyand/queue/pub/batch.go
@@ -26,7 +26,7 @@ import (
        "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       "github.com/apache/skywalking-banyandb/api/data"
+       apidata "github.com/apache/skywalking-banyandb/api/data"
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -50,13 +50,12 @@ const (
        // Remote-side failures (observed after the frame was written to the 
stream).
        sendErrReasonRecvError      = "recv_error"      // s.Recv returned an 
error (connection/protocol layer).
        sendErrReasonServerRejected = "server_rejected" // Server responded 
with a non-empty Error (includes failover statuses).
-
-       sendResultSuccess = "success"
 )
 
 type writeStream struct {
-       client    clusterv1.Service_SendClient
-       ctxDoneCh <-chan struct{}
+       client         clusterv1.Service_SendClient
+       ctxDoneCh      <-chan struct{}
+       firstFrameSent bool // false until the first frame is sent; the first 
frame carries the sender_* identity labels
 }
 
 type batchPublisher struct {
@@ -97,22 +96,13 @@ func (bp *batchPublisher) Publish(ctx context.Context, 
topic bus.Topic, messages
                        continue
                }
 
-               topicStr := topic.String()
                sendData := func() (success bool) {
                        if stream, ok := bp.streams[node]; ok {
-                               hasMetrics := bp.hasMetrics()
                                defer func() {
                                        if !success {
                                                delete(bp.streams, node)
-                                               if hasMetrics {
-                                                       
bp.pub.metrics.inflightStreams.Add(-1, node)
-                                               }
                                        }
                                }()
-                               if hasMetrics {
-                                       bp.pub.metrics.inflightRequests.Add(1, 
topicStr, node)
-                                       defer 
bp.pub.metrics.inflightRequests.Add(-1, topicStr, node)
-                               }
                                select {
                                case <-ctx.Done():
                                        return false
@@ -120,13 +110,28 @@ func (bp *batchPublisher) Publish(ctx context.Context, 
topic bus.Topic, messages
                                        return false
                                default:
                                }
-                               errSend := bp.retrySend(ctx, stream.client, r, 
node, topicStr)
+                               // Stamp sender identity on the first frame of 
each stream.
+                               if !stream.firstFrameSent {
+                                       r.SenderNode = bp.pub.selfNode
+                                       r.SenderRole = bp.pub.selfRole
+                                       r.SenderTier = bp.pub.selfTier
+                               } else {
+                                       r.SenderNode = ""
+                                       r.SenderRole = ""
+                                       r.SenderTier = ""
+                               }
+                               errSend := bp.retrySend(ctx, stream.client, r, 
node)
                                if errSend != nil {
                                        err = multierr.Append(err, 
fmt.Errorf("failed to send message to node %s: %w", node, errSend))
                                        // Record failure for circuit breaker 
(only for transient/internal errors)
                                        bp.pub.connMgr.RecordFailure(node, 
errSend)
                                        return false
                                }
+                               if !stream.firstFrameSent {
+                                       ws := bp.streams[node]
+                                       ws.firstFrameSent = true
+                                       bp.streams[node] = ws
+                               }
                                // Record success for circuit breaker
                                bp.pub.connMgr.RecordSuccess(node)
                                return true
@@ -187,9 +192,6 @@ func (bp *batchPublisher) Publish(ctx context.Context, 
topic bus.Topic, messages
                        client:    stream,
                        ctxDoneCh: streamCtx.Done(),
                }
-               if bp.hasMetrics() {
-                       bp.pub.metrics.inflightStreams.Add(1, node)
-               }
                bp.f.events = append(bp.f.events, make(chan batchEvent))
                _ = sendData()
                nodeName := node
@@ -197,7 +199,7 @@ func (bp *batchPublisher) Publish(ctx context.Context, 
topic bus.Topic, messages
                recvDeferFn := deferFn
                recvBC := bp.f.events[len(bp.f.events)-1]
                run.Go(ctx, "batch-stream-recv", bp.pub.log, func(runCtx 
context.Context) {
-                       bp.listenBatchResponse(runCtx, recvStream, recvDeferFn, 
recvBC, nodeName, topicStr)
+                       bp.listenBatchResponse(runCtx, recvStream, recvDeferFn, 
recvBC, nodeName)
                })
        }
        return nil, err
@@ -208,7 +210,7 @@ func (bp *batchPublisher) hasMetrics() bool {
 }
 
 // listenBatchResponse receives the server response and records failover 
events and end-to-end failure metrics.
-func (bp *batchPublisher) listenBatchResponse(ctx context.Context, s 
clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent, curNode, 
topic string) {
+func (bp *batchPublisher) listenBatchResponse(ctx context.Context, s 
clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent, curNode 
string) {
        defer func() {
                close(bc)
                deferFn()
@@ -219,15 +221,28 @@ func (bp *batchPublisher) listenBatchResponse(ctx 
context.Context, s clusterv1.S
        default:
        }
 
+       var topic bus.Topic
+       if bp.topic != nil {
+               topic = *bp.topic
+       }
+       operation := apidata.OperationOf(topic)
+       var info nodeInfo
+       if bp.pub != nil {
+               info = bp.pub.getNodeInfo(curNode)
+       }
+
        resp, errRecv := s.Recv()
        if errRecv != nil {
                if bp.hasMetrics() {
-                       bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode, 
sendErrReasonRecvError)
+                       bp.pub.metrics.totalErr.Inc(1, operation, "", curNode, 
info.role, info.tier, sendErrReasonRecvError)
                }
                if grpchelper.IsFailoverError(errRecv) {
                        // Record circuit breaker failure before creating 
failover event
                        bp.pub.connMgr.RecordFailure(curNode, errRecv)
-                       bc <- batchEvent{n: curNode, e: 
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, 
errRecv.Error())}
+                       select {
+                       case bc <- batchEvent{n: curNode, e: 
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, 
errRecv.Error())}:
+                       case <-ctx.Done():
+                       }
                }
                return
        }
@@ -235,7 +250,7 @@ func (bp *batchPublisher) listenBatchResponse(ctx 
context.Context, s clusterv1.S
                return
        }
        if bp.hasMetrics() {
-               bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode, 
sendErrReasonServerRejected)
+               bp.pub.metrics.totalErr.Inc(1, operation, "", curNode, 
info.role, info.tier, sendErrReasonServerRejected)
        }
        ce := common.NewErrorWithStatus(resp.Status, resp.Error)
        // Only failover statuses trigger circuit-breaker accounting; other 
server-side
@@ -245,17 +260,17 @@ func (bp *batchPublisher) listenBatchResponse(ctx 
context.Context, s clusterv1.S
                bp.pub.connMgr.RecordFailure(curNode, ce)
        }
        // Always surface a batchEvent for any non-empty resp.Error so that 
Close() exposes
-       // the rejection to the caller. Otherwise the 
send_err_total{reason="server_rejected"}
-       // counter would rise on dashboards while callers silently treat the 
batch as successful.
-       bc <- batchEvent{n: curNode, e: ce}
+       // the rejection to the caller.
+       select {
+       case bc <- batchEvent{n: curNode, e: ce}:
+       case <-ctx.Done():
+       }
 }
 
 func (bp *batchPublisher) Close() (cee map[string]*common.Error, err error) {
-       for nodeName, stream := range bp.streams {
+       for nodeName := range bp.streams {
+               stream := bp.streams[nodeName]
                err = multierr.Append(err, stream.client.CloseSend())
-               if bp.hasMetrics() {
-                       bp.pub.metrics.inflightStreams.Add(-1, nodeName)
-               }
        }
        for i := range bp.streams {
                <-bp.streams[i].ctxDoneCh
@@ -271,7 +286,7 @@ func (bp *batchPublisher) Close() (cee 
map[string]*common.Error, err error) {
                                // Record circuit breaker failure before 
failover
                                bp.pub.connMgr.RecordFailure(n, e.e)
                                if bp.topic == nil {
-                                       bp.pub.failover(ctx, n, e.e, 
data.TopicCommon)
+                                       bp.pub.failover(ctx, n, e.e, 
apidata.TopicCommon)
                                        continue
                                }
                                bp.pub.failover(ctx, n, e.e, *bp.topic)
@@ -332,12 +347,24 @@ func isFailoverStatus(s modelv1.Status) bool {
 }
 
 // retrySend implements bounded retries for client streaming sends with 
exponential backoff and jitter.
-func (bp *batchPublisher) retrySend(ctx context.Context, stream 
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string, topic 
string) error {
+func (bp *batchPublisher) retrySend(ctx context.Context, stream 
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string) error {
        var lastErr error
        start := time.Now()
-       observeDuration := func(result string) {
+
+       var topic bus.Topic
+       if bp.topic != nil {
+               topic = *bp.topic
+       }
+       operation := apidata.OperationOf(topic)
+       var info nodeInfo
+       if bp.pub != nil {
+               info = bp.pub.getNodeInfo(node)
+       }
+       group := r.GetGroup()
+
+       observeLatency := func() {
                if bp.hasMetrics() {
-                       
bp.pub.metrics.sendDurationSeconds.Observe(time.Since(start).Seconds(), topic, 
node, result)
+                       
bp.pub.metrics.totalLatency.Observe(time.Since(start).Seconds(), operation, 
group, node, info.role, info.tier)
                }
        }
 
@@ -350,16 +377,16 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                case <-ctx.Done():
                        cancel()
                        if bp.hasMetrics() {
-                               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonCanceled)
+                               bp.pub.metrics.totalErr.Inc(1, operation, 
group, node, info.role, info.tier, sendErrReasonCanceled)
                        }
-                       observeDuration(sendErrReasonCanceled)
+                       observeLatency()
                        return ctx.Err()
                case <-stream.Context().Done():
                        cancel()
                        if bp.hasMetrics() {
-                               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonStreamCanceled)
+                               bp.pub.metrics.totalErr.Inc(1, operation, 
group, node, info.role, info.tier, sendErrReasonStreamCanceled)
                        }
-                       observeDuration(sendErrReasonStreamCanceled)
+                       observeLatency()
                        return stream.Context().Err()
                case <-attemptCtx.Done():
                        cancel()
@@ -377,11 +404,11 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
 
                if sendErr == nil {
                        if bp.hasMetrics() {
-                               bp.pub.metrics.sendSuccessTotal.Inc(1, topic, 
node)
-                               
bp.pub.metrics.sendBytesTotal.Inc(float64(len(r.Body)), topic, node)
+                               bp.pub.metrics.totalStarted.Inc(1, operation, 
group, node, info.role, info.tier)
+                               bp.pub.metrics.totalFinished.Inc(1, operation, 
group, node, info.role, info.tier)
                        }
                        // Success writing to the local stream; end-to-end ack 
is observed in listenBatchResponse.
-                       observeDuration(sendResultSuccess)
+                       observeLatency()
                        return nil
                }
 
@@ -391,16 +418,12 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                if !grpchelper.IsTransientError(sendErr) {
                        // Non-transient error, don't retry
                        if bp.hasMetrics() {
-                               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonNonTransient)
+                               bp.pub.metrics.totalErr.Inc(1, operation, 
group, node, info.role, info.tier, sendErrReasonNonTransient)
                        }
-                       observeDuration(sendErrReasonNonTransient)
+                       observeLatency()
                        return sendErr
                }
 
-               if bp.hasMetrics() {
-                       bp.pub.metrics.sendRetryAttempts.Inc(1, topic, node)
-               }
-
                // If this was the last attempt, don't sleep
                if attempt >= defaultMaxRetries {
                        break
@@ -413,29 +436,25 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                select {
                case <-time.After(backoff):
                        // Continue to next attempt
-                       if bp.hasMetrics() {
-                               
bp.pub.metrics.sendBackoffSeconds.Inc(backoff.Seconds(), topic, node)
-                       }
                case <-ctx.Done():
                        if bp.hasMetrics() {
-                               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonCanceled)
+                               bp.pub.metrics.totalErr.Inc(1, operation, 
group, node, info.role, info.tier, sendErrReasonCanceled)
                        }
-                       observeDuration(sendErrReasonCanceled)
+                       observeLatency()
                        return ctx.Err()
                case <-stream.Context().Done():
                        if bp.hasMetrics() {
-                               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonStreamCanceled)
+                               bp.pub.metrics.totalErr.Inc(1, operation, 
group, node, info.role, info.tier, sendErrReasonStreamCanceled)
                        }
-                       observeDuration(sendErrReasonStreamCanceled)
+                       observeLatency()
                        return stream.Context().Err()
                }
        }
 
        // All retries exhausted
        if bp.hasMetrics() {
-               bp.pub.metrics.sendRetryExhausted.Inc(1, topic, node)
-               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonRetryExhausted)
+               bp.pub.metrics.totalErr.Inc(1, operation, group, node, 
info.role, info.tier, sendErrReasonRetryExhausted)
        }
-       observeDuration(sendErrReasonRetryExhausted)
+       observeLatency()
        return fmt.Errorf("retry exhausted for node %s after %d attempts, last 
error: %w", node, defaultMaxRetries+1, lastErr)
 }
diff --git a/banyand/queue/pub/chunked_sync.go 
b/banyand/queue/pub/chunked_sync.go
index 1a2cd3f30..f0da29073 100644
--- a/banyand/queue/pub/chunked_sync.go
+++ b/banyand/queue/pub/chunked_sync.go
@@ -28,6 +28,7 @@ import (
 
        "google.golang.org/grpc"
 
+       apidata "github.com/apache/skywalking-banyandb/api/data"
        apiversion "github.com/apache/skywalking-banyandb/api/proto/banyandb"
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
@@ -43,12 +44,18 @@ const (
 )
 
 type chunkedSyncClient struct {
-       client    clusterv1.ServiceClient
-       conn      *grpc.ClientConn
-       log       *logger.Logger
-       config    *ChunkedSyncClientConfig
-       node      string
-       chunkSize uint32
+       client     clusterv1.ServiceClient
+       conn       *grpc.ClientConn
+       log        *logger.Logger
+       metrics    *pubMetrics
+       config     *ChunkedSyncClientConfig
+       selfNode   string
+       selfRole   string
+       selfTier   string
+       node       string
+       remoteRole string
+       remoteTier string
+       chunkSize  uint32
 }
 
 // SyncStreamingParts implements queue.ChunkedSyncClient with streaming 
support.
@@ -99,22 +106,43 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
        }()
 
        startTime := time.Now()
+       group := parts[0].Group
+
+       topicStr := parts[0].Topic
+       operation := apidata.OperationFileSyncValue
+
+       if c.metrics != nil {
+               c.metrics.totalStarted.Inc(1, operation, group, c.node, 
c.remoteRole, c.remoteTier)
+       }
 
        metadata := &clusterv1.SyncMetadata{
-               Group:      parts[0].Group,
+               Group:      group,
                ShardId:    parts[0].ShardID,
-               Topic:      parts[0].Topic,
+               Topic:      topicStr,
                Timestamp:  startTime.UnixMilli(),
                TotalParts: uint32(len(parts)),
+               SenderNode: c.selfNode,
+               SenderRole: c.selfRole,
+               SenderTier: c.selfTier,
        }
 
        var totalBytesSent uint64
 
-       totalChunks, failedParts, err := c.streamPartsAsChunks(stream, 
sessionID, metadata, parts, &totalBytesSent)
-       if err != nil {
-               return nil, fmt.Errorf("failed to stream parts: %w", err)
+       totalChunks, failedParts, streamErr := c.streamPartsAsChunks(stream, 
sessionID, metadata, parts, &totalBytesSent)
+       if streamErr != nil {
+               errType := classifyChunkedSyncPubErr(streamErr)
+               if c.metrics != nil {
+                       c.metrics.totalErr.Inc(1, operation, group, c.node, 
c.remoteRole, c.remoteTier, errType)
+               }
+               return nil, fmt.Errorf("failed to stream parts: %w", streamErr)
        }
        if totalChunks == 0 && len(failedParts) == 0 {
+               duration := time.Since(startTime)
+               if c.metrics != nil {
+                       c.metrics.totalFinished.Inc(1, operation, group, 
c.node, c.remoteRole, c.remoteTier)
+                       c.metrics.totalLatency.Observe(duration.Seconds(), 
operation, group, c.node, c.remoteRole, c.remoteTier)
+                       c.metrics.sentBytes.Inc(float64(totalBytesSent), 
operation, group, c.node, c.remoteRole, c.remoteTier)
+               }
                return &queue.SyncResult{
                        Success:    true,
                        SessionID:  sessionID,
@@ -124,12 +152,15 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
 
        var finalResp *clusterv1.SyncPartResponse
        for {
-               resp, err := stream.Recv()
-               if errors.Is(err, io.EOF) {
+               resp, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
                        break
                }
-               if err != nil {
-                       return nil, fmt.Errorf("failed to receive final 
response: %w", err)
+               if recvErr != nil {
+                       if c.metrics != nil {
+                               c.metrics.totalErr.Inc(1, operation, group, 
c.node, c.remoteRole, c.remoteTier, "recv_error")
+                       }
+                       return nil, fmt.Errorf("failed to receive final 
response: %w", recvErr)
                }
                finalResp = resp
                if resp.GetSyncResult() != nil {
@@ -148,6 +179,18 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
                success = true
        }
 
+       if success {
+               if c.metrics != nil {
+                       c.metrics.totalFinished.Inc(1, operation, group, 
c.node, c.remoteRole, c.remoteTier)
+                       c.metrics.totalLatency.Observe(duration.Seconds(), 
operation, group, c.node, c.remoteRole, c.remoteTier)
+                       c.metrics.sentBytes.Inc(float64(totalBytesSent), 
operation, group, c.node, c.remoteRole, c.remoteTier)
+               }
+       } else {
+               if c.metrics != nil {
+                       c.metrics.totalErr.Inc(1, operation, group, c.node, 
c.remoteRole, c.remoteTier, "completion_error")
+               }
+       }
+
        return &queue.SyncResult{
                Success:     success,
                SessionID:   sessionID,
@@ -492,3 +535,23 @@ func (c *chunkedSyncClient) handleOutOfOrderResponse(resp 
*clusterv1.SyncPartRes
 func generateSessionID() string {
        return fmt.Sprintf("sync-%d", time.Now().UnixNano())
 }
+
+// classifyChunkedSyncPubErr maps a chunked-sync send error to an error_type 
label value.
+func classifyChunkedSyncPubErr(err error) string {
+       if err == nil {
+               return ""
+       }
+       msg := err.Error()
+       switch {
+       case strings.Contains(msg, "checksum mismatch"):
+               return "checksum_mismatch"
+       case strings.Contains(msg, "out of order"):
+               return "out_of_order"
+       case strings.Contains(msg, "session") && strings.Contains(msg, "not 
found"):
+               return "session_not_found"
+       case strings.Contains(msg, "receive") || strings.Contains(msg, "recv"):
+               return "recv_error"
+       default:
+               return "stream_error"
+       }
+}
diff --git a/banyand/queue/pub/metrics_test.go 
b/banyand/queue/pub/metrics_test.go
index b326bf88a..c088628cb 100644
--- a/banyand/queue/pub/metrics_test.go
+++ b/banyand/queue/pub/metrics_test.go
@@ -19,8 +19,6 @@ package pub
 
 import (
        "context"
-       "strings"
-       "sync"
        "testing"
        "time"
 
@@ -71,129 +69,55 @@ func (*countingCounter) Delete(_ ...string) bool {
        return true
 }
 
-// errReasonCapturer records send_err_total increments keyed by the `reason` 
label (third label).
-type errReasonCapturer struct {
+// Label order: operation, group, remote_node, remote_role, remote_tier, 
error_type.
+type errReasonCapturerImpl struct {
        byReason map[string]float64
-       mu       sync.Mutex
 }
 
-func newErrReasonCapturer() *errReasonCapturer {
-       return &errReasonCapturer{byReason: make(map[string]float64)}
+func newErrReasonCapturer() *errReasonCapturerImpl {
+       return &errReasonCapturerImpl{byReason: make(map[string]float64)}
 }
 
-func (c *errReasonCapturer) Inc(delta float64, labels ...string) {
-       c.mu.Lock()
-       defer c.mu.Unlock()
-       if len(labels) < 3 {
+func (c *errReasonCapturerImpl) Inc(delta float64, labels ...string) {
+       // error_type is the last label (index 5 for totalErr)
+       if len(labels) < 1 {
                return
        }
-       reason := labels[2]
-       c.byReason[reason] += delta
+       errorType := labels[len(labels)-1]
+       c.byReason[errorType] += delta
 }
 
-func (c *errReasonCapturer) Delete(_ ...string) bool {
+func (c *errReasonCapturerImpl) Delete(_ ...string) bool {
        return true
 }
 
-func (c *errReasonCapturer) sum(reason string) float64 {
-       c.mu.Lock()
-       defer c.mu.Unlock()
+func (c *errReasonCapturerImpl) sum(reason string) float64 {
        return c.byReason[reason]
 }
 
-type noopGauge struct{}
-
-func (*noopGauge) Set(_ float64, _ ...string) {}
-func (*noopGauge) Add(_ float64, _ ...string) {}
-func (*noopGauge) Delete(_ ...string) bool    { return true }
-
-type valGauge struct {
-       mu  sync.Mutex
-       val float64
-}
-
-func (g *valGauge) Add(delta float64, _ ...string) {
-       g.mu.Lock()
-       g.val += delta
-       g.mu.Unlock()
-}
-
-func (g *valGauge) Set(v float64, _ ...string) {
-       g.mu.Lock()
-       g.val = v
-       g.mu.Unlock()
-}
-
-func (*valGauge) Delete(_ ...string) bool { return true }
-
-func (g *valGauge) value() float64 {
-       g.mu.Lock()
-       defer g.mu.Unlock()
-       return g.val
-}
-
-// inflightKeyedGauge tracks inflight_requests Add deltas per (topic, node) 
label pair.
-type inflightKeyedGauge struct {
-       vals map[string]float64
-       mu   sync.Mutex
-}
-
-func newInflightKeyedGauge() *inflightKeyedGauge {
-       return &inflightKeyedGauge{vals: make(map[string]float64)}
-}
-
-func inflightReqKey(topic, node string) string {
-       return strings.Join([]string{topic, node}, "|")
-}
-
-func (g *inflightKeyedGauge) Add(delta float64, labels ...string) {
-       if len(labels) < 2 {
-               return
-       }
-       k := inflightReqKey(labels[0], labels[1])
-       g.mu.Lock()
-       g.vals[k] += delta
-       g.mu.Unlock()
-}
-
-func (*inflightKeyedGauge) Set(_ float64, _ ...string) {}
-
-func (*inflightKeyedGauge) Delete(_ ...string) bool {
-       return true
-}
-
-func (g *inflightKeyedGauge) net(topic, node string) float64 {
-       g.mu.Lock()
-       defer g.mu.Unlock()
-       return g.vals[inflightReqKey(topic, node)]
-}
-
 type noopHistogram struct{}
 
 func (*noopHistogram) Observe(_ float64, _ ...string) {}
 func (*noopHistogram) Delete(_ ...string) bool        { return true }
 
-func newPubMetricsWithErrCapture(sendErr *errReasonCapturer) *pubMetrics {
+func newPubMetricsWithErrCapture(totalErr *errReasonCapturerImpl) *pubMetrics 
{ //nolint:exhaustruct
        return &pubMetrics{
-               sendSuccessTotal:    &countingCounter{},
-               sendErrTotal:        sendErr,
-               sendBytesTotal:      &countingCounter{},
-               sendDurationSeconds: &noopHistogram{},
-               sendRetryAttempts:   &countingCounter{},
-               sendRetryExhausted:  &countingCounter{},
-               sendBackoffSeconds:  &countingCounter{},
-               inflightStreams:     &noopGauge{},
-               inflightRequests:    &noopGauge{},
+               totalStarted:  &countingCounter{},
+               totalFinished: &countingCounter{},
+               totalLatency:  &noopHistogram{},
+               totalErr:      totalErr,
+               sentBytes:     &countingCounter{},
        }
 }
 
 func newPubWithConnMgrForMetrics(t *testing.T, pm *pubMetrics) *pub {
        t.Helper()
-       p := &pub{
-               handlers: make(map[bus.Topic]schema.EventHandler),
-               log:      logger.GetLogger("queue-pub-metrics-test"),
-               metrics:  pm,
-               closer:   run.NewCloser(1),
+       p := &pub{ //nolint:exhaustruct
+               handlers:  make(map[bus.Topic]schema.EventHandler),
+               log:       logger.GetLogger("queue-pub-metrics-test"),
+               metrics:   pm,
+               closer:    run.NewCloser(1),
+               nodeCache: make(map[string]nodeInfo),
        }
        p.connMgr = 
grpchelper.NewConnManager(grpchelper.ConnManagerConfig[*client]{
                Handler:        p,
@@ -204,26 +128,24 @@ func newPubWithConnMgrForMetrics(t *testing.T, pm 
*pubMetrics) *pub {
        return p
 }
 
-// TestRetryMetrics ensures that retry-related metrics are updated when 
retrySend
-// observes retryable errors and eventually exhausts retries.
+// TestRetryMetrics verifies that retry-exhausted and error-type metrics are 
updated when
+// retrySend observes retryable errors and eventually exhausts retries.
 func TestRetryMetrics(t *testing.T) {
        sendErrCap := newErrReasonCapturer()
-       p := &pub{
+       p := &pub{ //nolint:exhaustruct
                metrics: &pubMetrics{
-                       sendRetryAttempts:   &countingCounter{},
-                       sendRetryExhausted:  &countingCounter{},
-                       sendErrTotal:        sendErrCap,
-                       sendBackoffSeconds:  &countingCounter{},
-                       sendSuccessTotal:    &countingCounter{},
-                       sendBytesTotal:      &countingCounter{},
-                       sendDurationSeconds: &noopHistogram{},
-                       inflightStreams:     &noopGauge{},
-                       inflightRequests:    &noopGauge{},
+                       totalStarted:  &countingCounter{},
+                       totalFinished: &countingCounter{},
+                       totalLatency:  &noopHistogram{},
+                       totalErr:      sendErrCap,
+                       sentBytes:     &countingCounter{},
                },
+               nodeCache: make(map[string]nodeInfo),
        }
 
        bp := &batchPublisher{
-               pub: p,
+               pub:   p,
+               topic: topicPtr(data.TopicMeasureWrite),
        }
 
        ctx := context.Background()
@@ -243,28 +165,28 @@ func TestRetryMetrics(t *testing.T) {
        }
 
        const nodeName = "test-node"
-       const topicStr = "test-topic"
 
-       retryErr := bp.retrySend(ctx, client, req, nodeName, topicStr)
+       retryErr := bp.retrySend(ctx, client, req, nodeName)
        require.Error(t, retryErr)
 
-       retries := p.metrics.sendRetryAttempts.(*countingCounter).count
-       require.Equal(t, float64(4), retries, "one retry counter per transient 
failure before exhaustion")
-
-       exhausted := p.metrics.sendRetryExhausted.(*countingCounter).count
-       require.Equal(t, float64(1), exhausted)
-
        require.Equal(t, float64(1), 
sendErrCap.sum(sendErrReasonRetryExhausted))
        require.Equal(t, float64(0), sendErrCap.sum(sendErrReasonNonTransient))
+}
 
-       backoff := p.metrics.sendBackoffSeconds.(*countingCounter).count
-       require.Greater(t, backoff, float64(0), "backoff should be recorded 
between retry attempts")
+func topicPtr(t bus.Topic) *bus.Topic {
+       return &t
 }
 
 func TestRetrySendNonTransientRecordsReason(t *testing.T) {
        sendErrCap := newErrReasonCapturer()
-       p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
-       bp := &batchPublisher{pub: p}
+       p := &pub{ //nolint:exhaustruct
+               metrics:   newPubMetricsWithErrCapture(sendErrCap),
+               nodeCache: make(map[string]nodeInfo),
+       }
+       bp := &batchPublisher{
+               pub:   p,
+               topic: topicPtr(data.TopicMeasureWrite),
+       }
 
        ctx := context.Background()
        mockStream := NewMockSendClient(ctx)
@@ -272,18 +194,23 @@ func TestRetrySendNonTransientRecordsReason(t *testing.T) 
{
                return status.Error(codes.InvalidArgument, "non-transient")
        })
 
-       err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1", 
"t1")
+       err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1")
        require.Error(t, err)
 
        require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonNonTransient))
        require.Equal(t, float64(0), 
sendErrCap.sum(sendErrReasonRetryExhausted))
-       require.Equal(t, float64(0), 
p.metrics.sendRetryAttempts.(*countingCounter).count)
 }
 
 func TestRetrySendCanceledRecordsReason(t *testing.T) {
        sendErrCap := newErrReasonCapturer()
-       p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
-       bp := &batchPublisher{pub: p}
+       p := &pub{ //nolint:exhaustruct
+               metrics:   newPubMetricsWithErrCapture(sendErrCap),
+               nodeCache: make(map[string]nodeInfo),
+       }
+       bp := &batchPublisher{
+               pub:   p,
+               topic: topicPtr(data.TopicMeasureWrite),
+       }
 
        ctx, cancel := context.WithCancel(context.Background())
        cancel()
@@ -293,7 +220,7 @@ func TestRetrySendCanceledRecordsReason(t *testing.T) {
                return status.Error(codes.Unavailable, "unavailable")
        })
 
-       err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1", 
"t1")
+       err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1")
        require.ErrorIs(t, err, context.Canceled)
 
        require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonCanceled))
@@ -301,15 +228,21 @@ func TestRetrySendCanceledRecordsReason(t *testing.T) {
 
 func TestRetrySendStreamCanceledRecordsReason(t *testing.T) {
        sendErrCap := newErrReasonCapturer()
-       p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
-       bp := &batchPublisher{pub: p}
+       p := &pub{ //nolint:exhaustruct
+               metrics:   newPubMetricsWithErrCapture(sendErrCap),
+               nodeCache: make(map[string]nodeInfo),
+       }
+       bp := &batchPublisher{
+               pub:   p,
+               topic: topicPtr(data.TopicMeasureWrite),
+       }
 
        streamCtx, cancel := context.WithCancel(context.Background())
        cancel()
 
        mockStream := NewMockSendClient(streamCtx)
 
-       err := bp.retrySend(context.Background(), mockStream, 
&clusterv1.SendRequest{}, "n1", "t1")
+       err := bp.retrySend(context.Background(), mockStream, 
&clusterv1.SendRequest{}, "n1")
        require.ErrorIs(t, err, context.Canceled)
 
        require.Equal(t, float64(1), 
sendErrCap.sum(sendErrReasonStreamCanceled))
@@ -319,7 +252,10 @@ func TestListenBatchResponseRecordsRecvError(t *testing.T) 
{
        sendErrCap := newErrReasonCapturer()
        pm := newPubMetricsWithErrCapture(sendErrCap)
        p := newPubWithConnMgrForMetrics(t, pm)
-       bp := &batchPublisher{pub: p}
+       bp := &batchPublisher{
+               pub:   p,
+               topic: topicPtr(data.TopicMeasureWrite),
+       }
 
        ctx := context.Background()
        mockStream := NewMockSendClient(ctx)
@@ -328,7 +264,7 @@ func TestListenBatchResponseRecordsRecvError(t *testing.T) {
        })
 
        bc := make(chan batchEvent, 1)
-       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a", 
"topic-a")
+       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a")
 
        require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonRecvError))
 }
@@ -337,7 +273,10 @@ func 
TestListenBatchResponseRecvNonFailoverStillRecordsRecvError(t *testing.T) {
        sendErrCap := newErrReasonCapturer()
        pm := newPubMetricsWithErrCapture(sendErrCap)
        p := newPubWithConnMgrForMetrics(t, pm)
-       bp := &batchPublisher{pub: p}
+       bp := &batchPublisher{
+               pub:   p,
+               topic: topicPtr(data.TopicMeasureWrite),
+       }
 
        ctx := context.Background()
        mockStream := NewMockSendClient(ctx)
@@ -346,7 +285,7 @@ func 
TestListenBatchResponseRecvNonFailoverStillRecordsRecvError(t *testing.T) {
        })
 
        bc := make(chan batchEvent, 1)
-       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a", 
"topic-a")
+       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a")
 
        require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonRecvError))
 }
@@ -355,7 +294,10 @@ func 
TestListenBatchResponseServerRejectedWithoutFailover(t *testing.T) {
        sendErrCap := newErrReasonCapturer()
        pm := newPubMetricsWithErrCapture(sendErrCap)
        p := newPubWithConnMgrForMetrics(t, pm)
-       bp := &batchPublisher{pub: p}
+       bp := &batchPublisher{
+               pub:   p,
+               topic: topicPtr(data.TopicMeasureWrite),
+       }
 
        ctx := context.Background()
        mockStream := NewMockSendClient(ctx)
@@ -367,14 +309,12 @@ func 
TestListenBatchResponseServerRejectedWithoutFailover(t *testing.T) {
        })
 
        bc := make(chan batchEvent, 1)
-       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a", 
"topic-a")
+       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a")
 
        require.Equal(t, float64(1), 
sendErrCap.sum(sendErrReasonServerRejected))
        require.Equal(t, float64(0), sendErrCap.sum(sendErrReasonRecvError))
 
-       // Non-failover server rejections are now surfaced to the caller via 
batchEvent
-       // so that send_err_total{reason="server_rejected"} stays consistent 
with visible errors.
-       // The event must carry the rejection but must NOT trigger a 
circuit-breaker failover.
+       // Non-failover server rejections are now surfaced to the caller via 
batchEvent.
        select {
        case evt, ok := <-bc:
                require.True(t, ok, "expected a batchEvent for non-failover 
server rejection")
@@ -390,7 +330,10 @@ func TestListenBatchResponseDiskFullSendsFailoverEvent(t 
*testing.T) {
        sendErrCap := newErrReasonCapturer()
        pm := newPubMetricsWithErrCapture(sendErrCap)
        p := newPubWithConnMgrForMetrics(t, pm)
-       bp := &batchPublisher{pub: p}
+       bp := &batchPublisher{
+               pub:   p,
+               topic: topicPtr(data.TopicMeasureWrite),
+       }
 
        ctx := context.Background()
        mockStream := NewMockSendClient(ctx)
@@ -402,7 +345,7 @@ func TestListenBatchResponseDiskFullSendsFailoverEvent(t 
*testing.T) {
        })
 
        bc := make(chan batchEvent, 1)
-       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a", 
"topic-a")
+       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a")
 
        require.Equal(t, float64(1), 
sendErrCap.sum(sendErrReasonServerRejected))
 
@@ -416,55 +359,17 @@ func TestListenBatchResponseDiskFullSendsFailoverEvent(t 
*testing.T) {
        }
 }
 
-func TestCloseDecrementsInflightStreams(t *testing.T) {
-       streamGauge := &valGauge{}
-       pm := &pubMetrics{
-               sendSuccessTotal:    &countingCounter{},
-               sendErrTotal:        newErrReasonCapturer(),
-               sendBytesTotal:      &countingCounter{},
-               sendDurationSeconds: &noopHistogram{},
-               sendRetryAttempts:   &countingCounter{},
-               sendRetryExhausted:  &countingCounter{},
-               sendBackoffSeconds:  &countingCounter{},
-               inflightStreams:     streamGauge,
-               inflightRequests:    &noopGauge{},
-       }
-       p := newPubWithConnMgrForMetrics(t, pm)
-
-       ctx := context.Background()
-       mockStream := NewMockSendClient(ctx)
-       doneCh := make(chan struct{})
-       close(doneCh)
-
-       bp := p.NewBatchPublisher(time.Second).(*batchPublisher)
-       bp.streams["node-x"] = writeStream{
-               client:    mockStream,
-               ctxDoneCh: doneCh,
-       }
-
-       streamGauge.Add(1, "node-x")
-
-       _, closeErr := bp.Close()
-       require.NoError(t, closeErr)
-       require.Equal(t, float64(0), streamGauge.value(), "Close should 
decrement inflight_streams once per open stream")
-}
-
-// TestPublishInflightRequestsBalancedWhenStreamPreexists checks that Publish 
increments then decrements
-// inflight_requests (via defer) when reusing an existing stream, without 
requiring GetClient / new stream setup.
-func TestPublishInflightRequestsBalancedWhenStreamPreexists(t *testing.T) {
-       inflightReq := newInflightKeyedGauge()
-       sendErrCap := newErrReasonCapturer()
-       attempts := &countingCounter{}
-       pm := &pubMetrics{
-               sendSuccessTotal:    attempts,
-               sendErrTotal:        sendErrCap,
-               sendBytesTotal:      &countingCounter{},
-               sendDurationSeconds: &noopHistogram{},
-               sendRetryAttempts:   &countingCounter{},
-               sendRetryExhausted:  &countingCounter{},
-               sendBackoffSeconds:  &countingCounter{},
-               inflightStreams:     &noopGauge{},
-               inflightRequests:    inflightReq,
+// TestPublishRecordsStartedAndFinished verifies that a successful retrySend 
increments
+// totalStarted and totalFinished with matching label counts.
+func TestPublishRecordsStartedAndFinished(t *testing.T) {
+       started := &countingCounter{}
+       finished := &countingCounter{}
+       pm := &pubMetrics{ //nolint:exhaustruct
+               totalStarted:  started,
+               totalFinished: finished,
+               totalLatency:  &noopHistogram{},
+               totalErr:      newErrReasonCapturer(),
+               sentBytes:     &countingCounter{},
        }
        p := newPubWithConnMgrForMetrics(t, pm)
 
@@ -479,7 +384,6 @@ func 
TestPublishInflightRequestsBalancedWhenStreamPreexists(t *testing.T) {
 
        const nodeName = "node-a"
        topic := data.TopicMeasureWrite
-       topicStr := topic.String()
 
        bp := p.NewBatchPublisher(10 * time.Second).(*batchPublisher)
        bp.streams[nodeName] = writeStream{
@@ -492,8 +396,6 @@ func 
TestPublishInflightRequestsBalancedWhenStreamPreexists(t *testing.T) {
        _, publishErr := bp.Publish(ctx, topic, msg)
        require.NoError(t, publishErr)
 
-       require.Equal(t, float64(1), attempts.count, "successful retrySend 
should record send_success_total")
-       require.Equal(t, float64(0), inflightReq.net(topicStr, nodeName),
-               "inflight_requests defer must balance +1/-1 for topic/node")
-       require.Equal(t, float64(0), 
sendErrCap.sum(sendErrReasonRetryExhausted))
+       require.Equal(t, float64(1), started.count, "totalStarted must be 1 on 
success")
+       require.Equal(t, float64(1), finished.count, "totalFinished must be 1 
on success")
 }
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 4fab5b8b4..28acdb747 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -34,7 +34,7 @@ import (
        "google.golang.org/protobuf/proto"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       "github.com/apache/skywalking-banyandb/api/data"
+       apidata "github.com/apache/skywalking-banyandb/api/data"
        apiversion "github.com/apache/skywalking-banyandb/api/proto/banyandb"
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -79,40 +79,42 @@ type pub struct {
        connMgr         *grpchelper.ConnManager[*client]
        closer          *run.Closer
        writableProbe   map[string]map[string]struct{}
+       nodeCache       map[string]nodeInfo
        caCertPath      string
        caCertReloader  *pkgtls.Reloader
        prefix          string
        retryPolicy     string
+       selfNode        string
+       selfRole        string
+       selfTier        string
        allowedRoles    []databasev1.Role
        writableProbeMu sync.Mutex
+       nodeCacheMu     sync.RWMutex
        tlsEnabled      bool
 }
 
-type pubMetrics struct {
-       sendSuccessTotal    meter.Counter
-       sendErrTotal        meter.Counter
-       sendBytesTotal      meter.Counter
-       sendDurationSeconds meter.Histogram
+// nodeInfo caches the resolved role and tier for a remote node.
+type nodeInfo struct {
+       role string
+       tier string
+}
 
-       sendRetryAttempts  meter.Counter
-       sendRetryExhausted meter.Counter
-       sendBackoffSeconds meter.Counter
-       inflightStreams    meter.Gauge
-       inflightRequests   meter.Gauge
+type pubMetrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalLatency  meter.Histogram
+       totalErr      meter.Counter
+       sentBytes     meter.Counter
 }
 
 func newPubMetrics(factory observability.Factory) *pubMetrics {
+       labels := []string{"operation", "group", "remote_node", "remote_role", 
"remote_tier"}
        return &pubMetrics{
-               sendSuccessTotal:    factory.NewCounter("send_success_total", 
"topic", "node"),
-               sendErrTotal:        factory.NewCounter("send_err_total", 
"topic", "node", "reason"),
-               sendBytesTotal:      factory.NewCounter("send_bytes_total", 
"topic", "node"),
-               sendDurationSeconds: 
factory.NewHistogram("send_duration_seconds", meter.DefBuckets, "topic", 
"node", "result"),
-
-               sendRetryAttempts:  
factory.NewCounter("send_retry_attempts_total", "topic", "node"),
-               sendRetryExhausted: 
factory.NewCounter("send_retry_exhausted_total", "topic", "node"),
-               sendBackoffSeconds: 
factory.NewCounter("send_backoff_seconds_total", "topic", "node"),
-               inflightStreams:    factory.NewGauge("inflight_streams", 
"node"),
-               inflightRequests:   factory.NewGauge("inflight_requests", 
"topic", "node"),
+               totalStarted:  factory.NewCounter("total_started", labels...),
+               totalFinished: factory.NewCounter("total_finished", labels...),
+               totalLatency:  factory.NewHistogram("total_latency", 
meter.DefBuckets, labels...),
+               totalErr:      factory.NewCounter("total_err", append(labels, 
"error_type")...),
+               sentBytes:     factory.NewCounter("sent_bytes", labels...),
        }
 }
 
@@ -143,10 +145,16 @@ func (p *pub) NewClient(conn *grpc.ClientConn, node 
*databasev1.Node) (*client,
 }
 
 // OnActive implements grpchelper.ConnectionHandler.
-func (p *pub) OnActive(_ string, c *client) {
+func (p *pub) OnActive(name string, c *client) {
        for _, h := range p.handlers {
                h.OnAddOrUpdate(c.md)
        }
+       if node, ok := c.md.Spec.(*databasev1.Node); ok {
+               info := resolveNodeInfo(node)
+               p.nodeCacheMu.Lock()
+               p.nodeCache[name] = info
+               p.nodeCacheMu.Unlock()
+       }
 }
 
 // OnInactive implements grpchelper.ConnectionHandler.
@@ -157,6 +165,49 @@ func (p *pub) OnInactive(name string, c *client) {
        p.writableProbeMu.Lock()
        delete(p.writableProbe, name)
        p.writableProbeMu.Unlock()
+       p.nodeCacheMu.Lock()
+       delete(p.nodeCache, name)
+       p.nodeCacheMu.Unlock()
+}
+
+// resolveNodeInfo extracts role and tier from a Node spec.
+func resolveNodeInfo(node *databasev1.Node) nodeInfo {
+       info := nodeInfo{}
+       for _, r := range node.GetRoles() {
+               switch r {
+               case databasev1.Role_ROLE_DATA:
+                       info.role = "data"
+               case databasev1.Role_ROLE_LIAISON:
+                       info.role = "liaison"
+               default:
+                       // ROLE_UNSPECIFIED, ROLE_META and any future roles are 
not queue peers.
+               }
+               if info.role != "" {
+                       break
+               }
+       }
+       if node.GetLabels() != nil {
+               info.tier = node.GetLabels()["type"]
+       }
+       return info
+}
+
+// getNodeInfo returns the cached nodeInfo for the named node.
+func (p *pub) getNodeInfo(name string) nodeInfo {
+       if p == nil {
+               return nodeInfo{}
+       }
+       p.nodeCacheMu.RLock()
+       info := p.nodeCache[name]
+       p.nodeCacheMu.RUnlock()
+       return info
+}
+
+// SetSelfNode sets the publisher's own node identity for sender stamping.
+func (p *pub) SetSelfNode(name, role, tier string) {
+       p.selfNode = name
+       p.selfRole = role
+       p.selfTier = tier
 }
 
 func (p *pub) FlagSet() *run.FlagSet {
@@ -327,6 +378,12 @@ func (p *pub) publish(timeout time.Duration, topic 
bus.Topic, messages ...bus.Me
                if errSend != nil {
                        return multierr.Append(err, fmt.Errorf("failed to 
marshal message[%d]: %w", m.ID(), errSend))
                }
+               // Stamp sender identity so the receiver can label 
query/control queue
+               // metrics by sender (topology). publish opens one stream per 
message, so
+               // every request is the first frame of its stream — the sub 
captures it there.
+               r.SenderNode = p.selfNode
+               r.SenderRole = p.selfRole
+               r.SenderTier = p.selfTier
                node := m.Node()
                execErr := p.connMgr.Execute(node, func(c *client) error {
                        ctx, cancel := 
context.WithTimeout(context.Background(), timeout)
@@ -390,12 +447,14 @@ func New(metadata metadata.Repo, roles 
...databasev1.Role) queue.Client {
                allowedRoles:  roles,
                prefix:        strBuilder.String(),
                writableProbe: make(map[string]map[string]struct{}),
+               nodeCache:     make(map[string]nodeInfo),
                retryPolicy:   retryPolicy,
        }
        return p
 }
 
 // NewWithoutMetadata returns a new queue client without metadata, defaulting 
to data nodes.
+// sender_* fields are left empty for this lifecycle-tool publisher.
 func NewWithoutMetadata() queue.Client {
        p := New(nil, databasev1.Role_ROLE_DATA)
        pp := p.(*pub)
@@ -465,15 +524,16 @@ func messageToRequest(topic bus.Topic, m bus.Message) 
(*clusterv1.SendRequest, e
                },
        }
 
-       switch data := m.Data().(type) {
+       switch msgData := m.Data().(type) {
        case proto.Message:
-               messageData, err := proto.Marshal(data)
+               messageData, err := proto.Marshal(msgData)
                if err != nil {
                        return nil, fmt.Errorf("failed to marshal message %T: 
%w", m, err)
                }
                r.Body = messageData
+               r.Group = apidata.GroupFromMessageData(topic, msgData)
        case []byte:
-               r.Body = data
+               r.Body = msgData
        default:
                return nil, fmt.Errorf("invalid message type %T", m.Data())
        }
@@ -519,7 +579,7 @@ func (l *future) Get() (bus.Message, error) {
        if resp.Body == nil {
                return bus.NewMessageWithNode(bus.MessageID(resp.MessageId), n, 
nil), nil
        }
-       if codec, ok := data.TopicResponseMap[t]; ok {
+       if codec, ok := apidata.TopicResponseMap[t]; ok {
                m, decodeErr := codec.Unmarshal(resp.Body)
                if decodeErr != nil {
                        return bus.Message{}, decodeErr
@@ -594,13 +654,25 @@ func (p *pub) NewChunkedSyncClientWithConfig(node string, 
config *ChunkedSyncCli
        if config.ChunkSize == 0 {
                config.ChunkSize = defaultChunkSize
        }
+       // Resolve the target node's role/tier so file-sync metrics carry the 
remote
+       // service-level topology labels (the connMgr client already holds the 
Node).
+       var info nodeInfo
+       if n, ok := c.md.Spec.(*databasev1.Node); ok {
+               info = resolveNodeInfo(n)
+       }
        return &chunkedSyncClient{
-               client:    c.client,
-               conn:      c.conn,
-               node:      node,
-               log:       p.log,
-               chunkSize: config.ChunkSize,
-               config:    config,
+               client:     c.client,
+               conn:       c.conn,
+               node:       node,
+               log:        p.log,
+               metrics:    p.metrics,
+               selfNode:   p.selfNode,
+               selfRole:   p.selfRole,
+               selfTier:   p.selfTier,
+               remoteRole: info.role,
+               remoteTier: info.tier,
+               chunkSize:  config.ChunkSize,
+               config:     config,
        }, nil
 }
 
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
index 680f3eee6..5568cf7d4 100644
--- a/banyand/queue/pub/pub_test.go
+++ b/banyand/queue/pub/pub_test.go
@@ -171,6 +171,11 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        p.OnAddOrUpdate(node1)
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
+                       // Wait for both nodes to finish connecting before 
publishing; otherwise the
+                       // first Publish can race the async connection setup 
and fail to get a client.
+                       gomega.Eventually(func() int {
+                               return p.connMgr.ActiveCount()
+                       }, flags.EventuallyTimeout).Should(gomega.Equal(2))
 
                        bp := p.NewBatchPublisher(15 * time.Second)
                        ctx := context.TODO()
diff --git a/banyand/queue/pub/retry_test.go b/banyand/queue/pub/retry_test.go
index 8293f90ee..b073c3470 100644
--- a/banyand/queue/pub/retry_test.go
+++ b/banyand/queue/pub/retry_test.go
@@ -109,7 +109,7 @@ func TestRetrySendSuccess(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+       err := bp.retrySend(ctx, mockStream, req, "test-node")
 
        assert.NoError(t, err, "successful send should not return error")
 }
@@ -131,7 +131,7 @@ func TestRetrySendTransientErrorWithRecovery(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+       err := bp.retrySend(ctx, mockStream, req, "test-node")
 
        assert.NoError(t, err, "should succeed after retry")
 }
@@ -149,7 +149,7 @@ func TestRetrySendNonTransientError(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+       err := bp.retrySend(ctx, mockStream, req, "test-node")
 
        assert.Error(t, err, "non-transient error should be returned 
immediately")
        assert.Equal(t, nonTransientErr, err, "should return the original 
error")
@@ -168,7 +168,7 @@ func TestRetrySendExhaustedRetries(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+       err := bp.retrySend(ctx, mockStream, req, "test-node")
 
        assert.Error(t, err, "should return error after exhausting retries")
        assert.Contains(t, err.Error(), "retry exhausted", "error should 
indicate retry exhaustion")
@@ -190,7 +190,7 @@ func TestRetrySendContextCancellation(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+       err := bp.retrySend(ctx, mockStream, req, "test-node")
 
        assert.Error(t, err, "should return error when context is canceled")
        assert.Equal(t, context.Canceled, err, "should return context 
cancellation error")
@@ -208,7 +208,7 @@ func TestRetrySendStreamContextDone(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+       err := bp.retrySend(ctx, mockStream, req, "test-node")
 
        assert.Error(t, err, "should return error when stream context is done")
        assert.Equal(t, context.Canceled, err, "should return stream context 
cancellation error")
@@ -228,7 +228,7 @@ func TestRetrySendPerAttemptTimeout(t *testing.T) {
        req := &clusterv1.SendRequest{}
 
        start := time.Now()
-       _ = bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+       _ = bp.retrySend(ctx, mockStream, req, "test-node")
        duration := time.Since(start)
 
        // Should timeout quickly due to per-attempt timeout, not wait for the 
full operation
@@ -259,7 +259,7 @@ func TestRetrySendBackoffTiming(t *testing.T) {
        req := &clusterv1.SendRequest{}
 
        start := time.Now()
-       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+       err := bp.retrySend(ctx, mockStream, req, "test-node")
        duration := time.Since(start)
 
        assert.NoError(t, err, "should eventually succeed")
@@ -302,7 +302,7 @@ func TestRetrySendConcurrency(t *testing.T) {
                        bp := &batchPublisher{}
                        req := &clusterv1.SendRequest{}
 
-                       err := bp.retrySend(ctx, mockStream, req, 
fmt.Sprintf("test-node-%d", id), "test-topic")
+                       err := bp.retrySend(ctx, mockStream, req, 
fmt.Sprintf("test-node-%d", id))
                        errors <- err
                }(i)
        }
diff --git a/banyand/queue/sub/chunked_sync.go 
b/banyand/queue/sub/chunked_sync.go
index 4deb88e59..1ed1be1fa 100644
--- a/banyand/queue/sub/chunked_sync.go
+++ b/banyand/queue/sub/chunked_sync.go
@@ -103,17 +103,16 @@ type chunkBuffer struct {
 }
 
 type syncSession struct {
-       startTime       time.Time
-       metadata        *clusterv1.SyncMetadata
-       partsProgress   map[int]*partProgress
-       partCtx         *queue.ChunkedSyncPartContext
-       chunkBuffer     *chunkBuffer
-       sessionID       string
-       errorMsg        string
-       totalReceived   uint64
-       chunksReceived  uint32
-       abortedRecorded bool
-       completed       bool
+       startTime      time.Time
+       metadata       *clusterv1.SyncMetadata
+       partsProgress  map[int]*partProgress
+       partCtx        *queue.ChunkedSyncPartContext
+       chunkBuffer    *chunkBuffer
+       sessionID      string
+       errorMsg       string
+       totalReceived  uint64
+       chunksReceived uint32
+       completed      bool
 }
 
 type partProgress struct {
@@ -122,54 +121,15 @@ type partProgress struct {
        completed     bool
 }
 
-const (
-       abortedReasonSwitch      = "switch"
-       abortedReasonStreamError = "stream_error"
-       abortedReasonCtxDone     = "ctx_done"
-       abortedReasonEOF         = "eof"
-)
-
-// releaseMetrics releases the gauges held by this session.
-// Idempotent: safe to call from both handleCompletion and the SyncPart defer.
-func (s *syncSession) releaseMetrics(m *metrics) {
-       if m == nil || s.completed {
-               return
-       }
-       m.activeSyncSessions.Add(-1, s.metadata.Topic)
-       if s.chunkBuffer != nil {
-               if n := len(s.chunkBuffer.chunks); n > 0 {
-                       m.reorderBuffered.Add(-float64(n), s.metadata.Topic)
-               }
-       }
-       s.completed = true
-}
-
-func (s *syncSession) recordAborted(m *metrics, reason string) {
-       if m == nil || s.abortedRecorded || s.completed {
-               return
-       }
-       if reason == "" {
-               return
-       }
-       // chunkedSyncAbortedTotal is always initialized in production; tests 
that wire partial metrics must set it if they trigger abort paths.
-       m.chunkedSyncAbortedTotal.Inc(1, s.metadata.Topic, reason)
-       s.abortedRecorded = true
-}
-
 // SyncPart implements clusterv1.ChunkedSyncServiceServer.
 func (s *server) SyncPart(stream clusterv1.ChunkedSyncService_SyncPartServer) 
error {
        ctx := stream.Context()
        var currentSession *syncSession
        var sessionID string
-       var abortReason string
        defer func() {
                if currentSession == nil {
                        return
                }
-               if !currentSession.completed {
-                       currentSession.recordAborted(s.metrics, abortReason)
-               }
-               currentSession.releaseMetrics(s.metrics)
                if currentSession.partCtx != nil {
                        if closeErr := currentSession.partCtx.Close(); closeErr 
!= nil {
                                s.log.Error().Err(closeErr).Str("session_id", 
currentSession.sessionID).Msg("failed to close session partCtx")
@@ -180,20 +140,29 @@ func (s *server) SyncPart(stream 
clusterv1.ChunkedSyncService_SyncPartServer) er
        for {
                select {
                case <-ctx.Done():
-                       abortReason = abortedReasonCtxDone
+                       if s.metrics != nil && currentSession != nil && 
!currentSession.completed {
+                               op, grp, sn, sr, st := 
s.resolveSessionLabels(currentSession)
+                               s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"ctx_done")
+                       }
                        return ctx.Err()
                default:
                }
 
-               req, err := stream.Recv()
-               if errors.Is(err, io.EOF) {
-                       abortReason = abortedReasonEOF
+               req, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       if s.metrics != nil && currentSession != nil && 
!currentSession.completed {
+                               op, grp, sn, sr, st := 
s.resolveSessionLabels(currentSession)
+                               s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"eof")
+                       }
                        break
                }
-               if err != nil {
-                       abortReason = abortedReasonStreamError
-                       s.log.Error().Err(err).Msg("failed to receive chunk")
-                       return err
+               if recvErr != nil {
+                       if s.metrics != nil && currentSession != nil && 
!currentSession.completed {
+                               op, grp, sn, sr, st := 
s.resolveSessionLabels(currentSession)
+                               s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"stream_error")
+                       }
+                       s.log.Error().Err(recvErr).Msg("failed to receive 
chunk")
+                       return recvErr
                }
 
                sessionID = req.SessionId
@@ -212,15 +181,27 @@ func (s *server) SyncPart(stream 
clusterv1.ChunkedSyncService_SyncPartServer) er
                        return s.handleCompletion(stream, currentSession, req)
                }
 
-               if err := s.processChunk(stream, currentSession, req); err != 
nil {
-                       s.log.Error().Err(err).Str("session_id", 
sessionID).Msg("failed to process chunk")
-                       return err
+               if processErr := s.processChunk(stream, currentSession, req); 
processErr != nil {
+                       s.log.Error().Err(processErr).Str("session_id", 
sessionID).Msg("failed to process chunk")
+                       return processErr
                }
        }
 
        return nil
 }
 
+// resolveSessionLabels returns the label tuple for the current session.
+func (s *server) resolveSessionLabels(session *syncSession) (operation, group, 
senderNode, senderRole, senderTier string) {
+       operation = data.OperationFileSyncValue
+       if session.metadata != nil {
+               group = session.metadata.GetGroup()
+               senderNode = session.metadata.GetSenderNode()
+               senderRole = session.metadata.GetSenderRole()
+               senderTier = session.metadata.GetSenderTier()
+       }
+       return
+}
+
 func (s *server) startOrSwitchSession(sessionID string, req 
*clusterv1.SyncPartRequest, previousSession *syncSession) *syncSession {
        if previousSession != nil {
                s.cleanupPreviousSession(previousSession)
@@ -234,7 +215,8 @@ func (s *server) startOrSwitchSession(sessionID string, req 
*clusterv1.SyncPartR
                partsProgress:  make(map[int]*partProgress),
        }
        if s.metrics != nil {
-               s.metrics.activeSyncSessions.Add(1, newSession.metadata.Topic)
+               op, grp, sn, sr, st := s.resolveSessionLabels(newSession)
+               s.metrics.totalStarted.Inc(1, op, grp, sn, sr, st)
        }
        if dl := s.log.Debug(); dl.Enabled() {
                dl.Str("session_id", sessionID).
@@ -247,16 +229,22 @@ func (s *server) startOrSwitchSession(sessionID string, 
req *clusterv1.SyncPartR
 
 func (s *server) cleanupPreviousSession(previousSession *syncSession) {
        if !previousSession.completed {
-               previousSession.recordAborted(s.metrics, abortedReasonSwitch)
+               if s.metrics != nil {
+                       op, grp, sn, sr, st := 
s.resolveSessionLabels(previousSession)
+                       s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, "switch")
+               }
+               previousSession.completed = true
        }
-       previousSession.releaseMetrics(s.metrics)
 
        if previousSession.partCtx == nil {
                return
        }
        if previousSession.partCtx.Handler != nil {
                if finishErr := previousSession.partCtx.Handler.FinishSync(); 
finishErr != nil {
-                       s.updateChunkOrderMetrics("finish_sync_err", 
previousSession.metadata.Topic)
+                       if s.metrics != nil {
+                               op, grp, sn, sr, st := 
s.resolveSessionLabels(previousSession)
+                               s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"finish_sync_err")
+                       }
                        s.log.Error().Err(finishErr).Str("session_id", 
previousSession.sessionID).Msg("failed to finish sync for previous session")
                }
        }
@@ -268,8 +256,8 @@ func (s *server) cleanupPreviousSession(previousSession 
*syncSession) {
 func (s *server) processChunk(stream 
clusterv1.ChunkedSyncService_SyncPartServer, session *syncSession, req 
*clusterv1.SyncPartRequest) error {
        // Check version compatibility on every chunk
        if req.VersionInfo != nil {
-               versionCompatibility, status := 
checkSyncVersionCompatibility(req.VersionInfo)
-               if status != clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED {
+               versionCompatibility, syncStatus := 
checkSyncVersionCompatibility(req.VersionInfo)
+               if syncStatus != 
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED {
                        s.log.Warn().
                                Str("session_id", req.SessionId).
                                Str("client_api_version", 
req.VersionInfo.ApiVersion).
@@ -277,7 +265,7 @@ func (s *server) processChunk(stream 
clusterv1.ChunkedSyncService_SyncPartServer
                                Str("reason", versionCompatibility.Reason).
                                Msg("sync version compatibility check failed")
 
-                       return s.sendResponse(stream, req, status, 
versionCompatibility.Reason, versionCompatibility)
+                       return s.sendResponse(stream, req, syncStatus, 
versionCompatibility.Reason, versionCompatibility)
                }
        }
 
@@ -315,14 +303,14 @@ func (s *server) processChunkSequential(stream 
clusterv1.ChunkedSyncService_Sync
 func (s *server) processChunkWithReordering(stream 
clusterv1.ChunkedSyncService_SyncPartServer, session *syncSession, req 
*clusterv1.SyncPartRequest) error {
        buffer := session.chunkBuffer
        // must check buffer timeout before refreshing lastActivity, otherwise 
it will never timeout.
-       if err := s.checkBufferTimeout(session); err != nil {
-               return err
+       if checkErr := s.checkBufferTimeout(session); checkErr != nil {
+               return checkErr
        }
        buffer.lastActivity = time.Now()
 
        if req.ChunkIndex == buffer.expectedIndex {
-               if err := s.processExpectedChunk(stream, session, req); err != 
nil {
-                       return err
+               if processErr := s.processExpectedChunk(stream, session, req); 
processErr != nil {
+                       return processErr
                }
                buffer.expectedIndex++
 
@@ -331,7 +319,6 @@ func (s *server) processChunkWithReordering(stream 
clusterv1.ChunkedSyncService_
 
        if req.ChunkIndex > buffer.expectedIndex {
                gap := req.ChunkIndex - buffer.expectedIndex
-               s.updateChunkOrderMetrics("out_of_order_received", 
session.metadata.Topic)
 
                if gap > s.maxChunkGapSize {
                        errMsg := fmt.Sprintf("chunk gap too large: expected 
%d, got %d (gap: %d > max: %d)",
@@ -342,7 +329,10 @@ func (s *server) processChunkWithReordering(stream 
clusterv1.ChunkedSyncService_
                                Uint32("gap", gap).
                                Uint32("max_gap", s.maxChunkGapSize).
                                Msg("chunk gap too large, rejecting")
-                       s.updateChunkOrderMetrics("gap_too_large", 
session.metadata.Topic)
+                       if s.metrics != nil {
+                               op, grp, sn, sr, st := 
s.resolveSessionLabels(session)
+                               s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"gap_too_large")
+                       }
                        return s.sendResponse(stream, req, 
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_OUT_OF_ORDER, errMsg, nil)
                }
 
@@ -353,7 +343,10 @@ func (s *server) processChunkWithReordering(stream 
clusterv1.ChunkedSyncService_
                                Uint32("buffer_size", 
uint32(len(buffer.chunks))).
                                Uint32("max_buffer_size", buffer.maxBufferSize).
                                Msg("chunk buffer full, rejecting chunk")
-                       s.updateChunkOrderMetrics("buffer_full", 
session.metadata.Topic)
+                       if s.metrics != nil {
+                               op, grp, sn, sr, st := 
s.resolveSessionLabels(session)
+                               s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"buffer_full")
+                       }
                        return s.sendResponse(stream, req, 
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_OUT_OF_ORDER, errMsg, nil)
                }
 
@@ -366,10 +359,6 @@ func (s *server) processChunkWithReordering(stream 
clusterv1.ChunkedSyncService_
                                Uint32("buffered_chunks", 
uint32(len(buffer.chunks))).
                                Msg("buffered out-of-order chunk")
                }
-               s.updateChunkOrderMetrics("chunk_buffered", 
session.metadata.Topic)
-               if s.metrics != nil {
-                       s.metrics.reorderBuffered.Add(1, session.metadata.Topic)
-               }
                return s.sendResponse(stream, req, 
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED,
                        fmt.Sprintf("chunk %d buffered (waiting for %d)", 
req.ChunkIndex, buffer.expectedIndex), nil)
        }
@@ -395,6 +384,10 @@ func (s *server) processExpectedChunk(stream 
clusterv1.ChunkedSyncService_SyncPa
                errMsg := fmt.Sprintf("chunk %d checksum mismatch: expected %s, 
got %s",
                        req.ChunkIndex, req.ChunkChecksum, calculatedChecksum)
                s.log.Warn().Str("session_id", req.SessionId).Msg(errMsg)
+               if s.metrics != nil {
+                       op, grp, sn, sr, st := s.resolveSessionLabels(session)
+                       s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"checksum_mismatch")
+               }
                return s.sendResponse(stream, req, 
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_CHECKSUM_MISMATCH, errMsg, nil)
        }
 
@@ -418,8 +411,8 @@ func (s *server) processExpectedChunk(stream 
clusterv1.ChunkedSyncService_SyncPa
                        session.partCtx.ID != partInfo.Id
 
                if createNewContext && session.partCtx != nil && 
session.partCtx.Handler != nil {
-                       if err := session.partCtx.Handler.FinishSync(); err != 
nil {
-                               return fmt.Errorf("failed to complete part %d: 
%w", session.partCtx.ID, err)
+                       if finishErr := session.partCtx.Handler.FinishSync(); 
finishErr != nil {
+                               return fmt.Errorf("failed to complete part %d: 
%w", session.partCtx.ID, finishErr)
                        }
                }
 
@@ -438,9 +431,9 @@ func (s *server) processExpectedChunk(stream 
clusterv1.ChunkedSyncService_SyncPa
                                MaxKey:                partInfo.MaxKey,
                                PartType:              partInfo.PartType,
                        }
-                       partHandler, err := 
handler.CreatePartHandler(session.partCtx)
-                       if err != nil {
-                               return fmt.Errorf("failed to create part 
handler: %w", err)
+                       partHandler, createErr := 
handler.CreatePartHandler(session.partCtx)
+                       if createErr != nil {
+                               return fmt.Errorf("failed to create part 
handler: %w", createErr)
                        }
                        session.partCtx.Handler = partHandler
                } else if session.partCtx.PartType != partInfo.PartType {
@@ -453,18 +446,22 @@ func (s *server) processExpectedChunk(stream 
clusterv1.ChunkedSyncService_SyncPa
                        session.partCtx.MinKey = partInfo.MinKey
                        session.partCtx.MaxKey = partInfo.MaxKey
                        session.partCtx.PartType = partInfo.PartType
-                       if err := 
session.partCtx.Handler.NewPartType(session.partCtx); err != nil {
-                               return fmt.Errorf("failed to new part type: 
%w", err)
+                       if newPartErr := 
session.partCtx.Handler.NewPartType(session.partCtx); newPartErr != nil {
+                               return fmt.Errorf("failed to new part type: 
%w", newPartErr)
                        }
                }
 
-               if err := s.processPart(session, req, partInfo, partIndex, 
handler); err != nil {
-                       s.log.Error().Err(err).
+               if processErr := s.processPart(session, req, partInfo, 
partIndex, handler); processErr != nil {
+                       s.log.Error().Err(processErr).
                                Str("session_id", req.SessionId).
                                Str("topic", session.metadata.Topic).
                                Int("part_index", partIndex).
                                Msg("failed to process part")
-                       return err
+                       if s.metrics != nil {
+                               op, grp, sn, sr, st := 
s.resolveSessionLabels(session)
+                               s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"part_failed")
+                       }
+                       return processErr
                }
        }
 
@@ -477,9 +474,6 @@ func (s *server) processBufferedChunks(stream 
clusterv1.ChunkedSyncService_SyncP
        for {
                if chunk, exists := buffer.chunks[buffer.expectedIndex]; exists 
{
                        delete(buffer.chunks, buffer.expectedIndex)
-                       if s.metrics != nil {
-                               s.metrics.reorderBuffered.Add(-1, 
session.metadata.Topic)
-                       }
 
                        if dl := s.log.Debug(); dl.Enabled() {
                                dl.Str("session_id", session.sessionID).
@@ -489,8 +483,8 @@ func (s *server) processBufferedChunks(stream 
clusterv1.ChunkedSyncService_SyncP
                                        Msg("processing buffered chunk")
                        }
 
-                       if err := s.processExpectedChunk(stream, session, 
chunk); err != nil {
-                               return err
+                       if processErr := s.processExpectedChunk(stream, 
session, chunk); processErr != nil {
+                               return processErr
                        }
                        buffer.expectedIndex++
                } else {
@@ -514,7 +508,10 @@ func (s *server) checkBufferTimeout(session *syncSession) 
error {
                                        missing = append(missing, i)
                                }
                        }
-                       s.updateChunkOrderMetrics("buffer_timeout", 
session.metadata.Topic)
+                       if s.metrics != nil {
+                               op, grp, sn, sr, st := 
s.resolveSessionLabels(session)
+                               s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"buffer_timeout")
+                       }
                        return fmt.Errorf("buffer timeout: missing chunks %v 
after %v",
                                missing, session.chunkBuffer.bufferTimeout)
                }
@@ -565,8 +562,8 @@ func (s *server) processPart(session *syncSession, req 
*clusterv1.SyncPartReques
                session.partCtx.FileName = fileInfo.Name
                session.partCtx.PartType = partInfo.PartType
 
-               if err := handler.HandleFileChunk(session.partCtx, fileChunk); 
err != nil {
-                       return fmt.Errorf("failed to stream file chunk for %s: 
%w", fileInfo.Name, err)
+               if handleErr := handler.HandleFileChunk(session.partCtx, 
fileChunk); handleErr != nil {
+                       return fmt.Errorf("failed to stream file chunk for %s: 
%w", fileInfo.Name, handleErr)
                }
        }
 
@@ -581,9 +578,13 @@ func (s *server) processPart(session *syncSession, req 
*clusterv1.SyncPartReques
 }
 
 func (s *server) handleCompletion(stream 
clusterv1.ChunkedSyncService_SyncPartServer, session *syncSession, req 
*clusterv1.SyncPartRequest) error {
-       if session.partCtx.Handler != nil {
-               if err := session.partCtx.Handler.FinishSync(); err != nil {
-                       return fmt.Errorf("failed to complete part %d: %w", 
session.partCtx.ID, err)
+       if session.partCtx != nil && session.partCtx.Handler != nil {
+               if finishErr := session.partCtx.Handler.FinishSync(); finishErr 
!= nil {
+                       if s.metrics != nil {
+                               op, grp, sn, sr, st := 
s.resolveSessionLabels(session)
+                               s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"finish_sync_err")
+                       }
+                       return fmt.Errorf("failed to complete part %d: %w", 
session.partCtx.ID, finishErr)
                }
                session.partCtx.Handler = nil
        }
@@ -613,15 +614,15 @@ func (s *server) handleCompletion(stream 
clusterv1.ChunkedSyncService_SyncPartSe
                PartsResults:       partsResults,
        }
 
-       session.releaseMetrics(s.metrics)
+       session.completed = true
        if s.metrics != nil {
-               topic := session.metadata.Topic
-               
s.metrics.chunkedSyncTotalBytes.Inc(float64(syncResult.TotalBytesReceived), 
topic)
-               
s.metrics.chunkedSyncDurationSecs.Observe(float64(syncResult.DurationMs)/1000.0,
 topic)
-
+               op, grp, sn, sr, st := s.resolveSessionLabels(session)
+               s.metrics.totalFinished.Inc(1, op, grp, sn, sr, st)
+               
s.metrics.totalLatency.Observe(float64(syncResult.DurationMs)/1000.0, op, grp, 
sn, sr, st)
+               
s.metrics.receivedBytes.Inc(float64(syncResult.TotalBytesReceived), op, grp, 
sn, sr, st)
                for _, pr := range partsResults {
                        if !pr.Success {
-                               s.metrics.chunkedSyncFailedParts.Inc(1, topic)
+                               s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, 
"part_failed")
                        }
                }
        }
@@ -641,14 +642,14 @@ func (s *server) handleCompletion(stream 
clusterv1.ChunkedSyncService_SyncPartSe
 func (s *server) sendResponse(
        stream clusterv1.ChunkedSyncService_SyncPartServer,
        req *clusterv1.SyncPartRequest,
-       status clusterv1.SyncStatus,
+       syncStatus clusterv1.SyncStatus,
        errorMsg string,
        result interface{},
 ) error {
        resp := &clusterv1.SyncPartResponse{
                SessionId:  req.SessionId,
                ChunkIndex: req.ChunkIndex,
-               Status:     status,
+               Status:     syncStatus,
                Error:      errorMsg,
        }
 
diff --git a/banyand/queue/sub/chunked_sync_test.go 
b/banyand/queue/sub/chunked_sync_test.go
index 2ba9889ff..206f93b8c 100644
--- a/banyand/queue/sub/chunked_sync_test.go
+++ b/banyand/queue/sub/chunked_sync_test.go
@@ -211,102 +211,75 @@ func (c *capturingCounter) Delete(_ ...string) bool {
        return true
 }
 
-func (c *capturingCounter) uniqueFirstLabelValues() map[string]struct{} {
-       c.mu.Lock()
-       defer c.mu.Unlock()
-
-       m := make(map[string]struct{})
-       for _, lv := range c.labelValues {
-               if len(lv) > 0 {
-                       m[lv[0]] = struct{}{}
-               }
-       }
-       return m
-}
-
-func TestChunkOrderingMetricsAreLabeledByTopic_NotSessionID(t *testing.T) {
-       // enable reordering, otherwise the chunk-ordering metrics will not be 
triggered.
-       s := &server{
-               log:                   
logger.GetLogger("test-server-metrics-label"),
+// TestChunkOrderingBufferFull verifies that buffer_full error_type is emitted 
when the
+// chunk buffer is full. The new metrics use 
(operation,group,remote_node,remote_role,remote_tier,error_type).
+func TestChunkOrderingBufferFull(t *testing.T) {
+       errCap := &capturingCounter{}
+       s := &server{ //nolint:exhaustruct
+               log:                   
logger.GetLogger("test-server-buffer-full"),
                chunkedSyncHandlers:   
make(map[bus.Topic]queue.ChunkedSyncHandler),
                enableChunkReordering: true,
-               maxChunkBufferSize:    10,
-               maxChunkGapSize:       5,
+               maxChunkBufferSize:    1,
+               maxChunkGapSize:       10,
+               chunkBufferTimeout:    time.Hour,
+               metrics:               newTestMetrics(),
        }
+       s.metrics.totalErr = errCap
 
-       // handler: avoid "no handler registered" in processExpectedChunk.
        mockHandler := &MockChunkedSyncHandler{}
        s.chunkedSyncHandlers[data.TopicStreamPartSync] = mockHandler
 
-       outOfOrder := &capturingCounter{}
-       buffered := &capturingCounter{}
-       testMetrics := newTestMetrics()
-       testMetrics.outOfOrderChunksReceived = outOfOrder
-       testMetrics.chunksBuffered = buffered
-       s.metrics = testMetrics
-
-       topic := data.TopicStreamPartSync.String()
-
-       drive := func(sessionID string) {
-               mockStream := &MockSyncPartStream{}
-               session := &syncSession{
-                       sessionID:      sessionID,
-                       startTime:      time.Now(),
-                       chunksReceived: 0,
-                       partsProgress:  make(map[int]*partProgress),
-                       metadata: &clusterv1.SyncMetadata{
-                               Group: "test-group",
-                               Topic: topic,
-                       },
-               }
-
-               // send chunk 0 (establish buffer.expectedIndex=1)
-               req0 := &clusterv1.SyncPartRequest{
-                       SessionId:     sessionID,
-                       ChunkIndex:    0,
-                       ChunkData:     []byte("chunk-0"),
-                       ChunkChecksum: fmt.Sprintf("%x", 
crc32.ChecksumIEEE([]byte("chunk-0"))),
-                       PartsInfo: []*clusterv1.PartInfo{
-                               {Id: 1, Files: []*clusterv1.FileInfo{{Name: 
"f", Offset: 0, Size: 7}}},
-                       },
-               }
-               require.NoError(t, s.processChunk(mockStream, session, req0))
+       mockStream := &MockSyncPartStream{}
+       session := &syncSession{ //nolint:exhaustruct
+               sessionID:      "buf-full-sess",
+               startTime:      time.Now(),
+               chunksReceived: 0,
+               partsProgress:  make(map[int]*partProgress),
+               metadata: &clusterv1.SyncMetadata{
+                       Group: "test-group",
+                       Topic: data.TopicStreamPartSync.String(),
+               },
+       }
 
-               // send chunk 2 (out-of-order: expected 1 got 2),
-               // will trigger out_of_order_received + chunk_buffered.
-               req2 := &clusterv1.SyncPartRequest{
-                       SessionId:     sessionID,
-                       ChunkIndex:    2,
-                       ChunkData:     []byte("chunk-2"),
-                       ChunkChecksum: fmt.Sprintf("%x", 
crc32.ChecksumIEEE([]byte("chunk-2"))),
-                       PartsInfo: []*clusterv1.PartInfo{
-                               {Id: 2, Files: []*clusterv1.FileInfo{{Name: 
"f", Offset: 0, Size: 7}}},
-                       },
+       // chunk 0 — processed normally, buffer.expectedIndex becomes 1
+       req0 := &clusterv1.SyncPartRequest{
+               SessionId:     "buf-full-sess",
+               ChunkIndex:    0,
+               ChunkData:     []byte("chunk-0"),
+               ChunkChecksum: fmt.Sprintf("%x", 
crc32.ChecksumIEEE([]byte("chunk-0"))),
+               PartsInfo:     []*clusterv1.PartInfo{{Id: 1, Files: 
[]*clusterv1.FileInfo{{Name: "f", Offset: 0, Size: 7}}}},
+       }
+       require.NoError(t, s.processChunk(mockStream, session, req0))
+
+       // chunk 2 — out-of-order, fills the buffer (maxBufferSize=1)
+       req2 := &clusterv1.SyncPartRequest{
+               SessionId:     "buf-full-sess",
+               ChunkIndex:    2,
+               ChunkData:     []byte("chunk-2"),
+               ChunkChecksum: fmt.Sprintf("%x", 
crc32.ChecksumIEEE([]byte("chunk-2"))),
+               PartsInfo:     []*clusterv1.PartInfo{{Id: 2, Files: 
[]*clusterv1.FileInfo{{Name: "f", Offset: 0, Size: 7}}}},
+       }
+       require.NoError(t, s.processChunk(mockStream, session, req2))
+
+       // chunk 3 — buffer is full, should trigger buffer_full error_type
+       req3 := &clusterv1.SyncPartRequest{
+               SessionId:     "buf-full-sess",
+               ChunkIndex:    3,
+               ChunkData:     []byte("chunk-3"),
+               ChunkChecksum: fmt.Sprintf("%x", 
crc32.ChecksumIEEE([]byte("chunk-3"))),
+               PartsInfo:     []*clusterv1.PartInfo{{Id: 3, Files: 
[]*clusterv1.FileInfo{{Name: "f", Offset: 0, Size: 7}}}},
+       }
+       require.NoError(t, s.processChunk(mockStream, session, req3))
+
+       // error_type label is the last (index 5): 
operation,group,remote_node,remote_role,remote_tier,error_type
+       found := false
+       for _, labels := range errCap.labelValues {
+               if len(labels) > 0 && labels[len(labels)-1] == "buffer_full" {
+                       found = true
+                       break
                }
-               require.NoError(t, s.processChunk(mockStream, session, req2))
        }
-
-       drive("test-session-A")
-       drive("test-session-B")
-
-       // assert: labelValues[0] must be topic; and unique label must be only 
one (topic)
-       uniqOut := outOfOrder.uniqueFirstLabelValues()
-       uniqBuf := buffered.uniqueFirstLabelValues()
-
-       assert.Equal(t, 1, len(uniqOut))
-       assert.Equal(t, 1, len(uniqBuf))
-
-       _, okOut := uniqOut[topic]
-       _, okBuf := uniqBuf[topic]
-       assert.True(t, okOut, "out_of_order_received label must be topic")
-       assert.True(t, okBuf, "chunk_buffered label must be topic")
-
-       // assert: never should have session_id as label
-       _, bad1 := uniqOut["test-session-A"]
-       _, bad2 := uniqOut["test-session-B"]
-       _, bad3 := uniqBuf["test-session-A"]
-       _, bad4 := uniqBuf["test-session-B"]
-       assert.False(t, bad1 || bad2 || bad3 || bad4, "metrics must not be 
labeled by session_id")
+       assert.True(t, found, "expected error_type=buffer_full to be recorded")
 }
 
 // MockChunkedSyncHandler implements queue.ChunkedSyncHandler for testing.
diff --git a/banyand/queue/sub/helpers.go b/banyand/queue/sub/helpers.go
index ca08bf530..b8b77e93d 100644
--- a/banyand/queue/sub/helpers.go
+++ b/banyand/queue/sub/helpers.go
@@ -29,13 +29,13 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-func (s *server) handleEOF(stream clusterv1.Service_SendServer, topic 
*bus.Topic, dataCollection []any, writeEntity *clusterv1.SendRequest) {
+func (s *server) handleEOF(stream clusterv1.Service_SendServer, topic 
*bus.Topic, dataCollection []any, writeEntity *clusterv1.SendRequest, identity 
*streamIdentity) {
        if len(dataCollection) < 1 {
                return
        }
        listeners := s.getListeners(*topic)
        if len(listeners) == 0 {
-               s.reply(stream, writeEntity, nil, "no listener found")
+               s.replyWithErrType(stream, writeEntity, nil, "no listener 
found", identity, "no_listener")
                return
        }
        if len(listeners) > 1 {
@@ -43,30 +43,24 @@ func (s *server) handleEOF(stream 
clusterv1.Service_SendServer, topic *bus.Topic
        }
        listener := listeners[0]
        if le := listener.CheckHealth(); le != nil {
-               s.reply(stream, writeEntity, le, "")
+               s.replyWithErrType(stream, writeEntity, le, "", identity, 
"handler_error")
                return
        }
        message := listener.Rev(stream.Context(), 
bus.NewMessage(bus.MessageID(0), dataCollection))
-       var resp *clusterv1.SendResponse
-       data := message.Data()
-       if data != nil {
-               switch d := data.(type) {
-               case *common.Error:
-                       resp = &clusterv1.SendResponse{
-                               MessageId: writeEntity.MessageId,
-                               Error:     d.Error(),
-                               Status:    d.Status(),
-                       }
-               default:
-                       resp = &clusterv1.SendResponse{
-                               MessageId: writeEntity.MessageId,
-                       }
-               }
+       // writeEntity is nil when the stream closed (Recv returned io.EOF), so 
guard the ID access.
+       var msgID uint64
+       if writeEntity != nil {
+               msgID = writeEntity.MessageId
+       }
+       resp := &clusterv1.SendResponse{MessageId: msgID}
+       if ce, ok := message.Data().(*common.Error); ok {
+               resp.Error = ce.Error()
+               resp.Status = ce.Status()
        }
        if errSend := stream.Send(resp); errSend != nil {
                s.log.Error().Stringer("written", 
writeEntity).Err(errSend).Msg("failed to send write response")
-               if writeEntity != nil {
-                       s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
+               if s.metrics != nil && writeEntity != nil {
+                       s.metrics.totalErr.Inc(1, identity.operation, 
identity.group, identity.senderNode, identity.senderRole, identity.senderTier, 
"send_error")
                }
        }
 }
@@ -79,11 +73,12 @@ func (s *server) handleRecvError(err error) error {
        return err
 }
 
-func (s *server) handleBatch(dataCollection *[]any, writeEntity 
*clusterv1.SendRequest, start *time.Time) {
+func (s *server) handleBatch(dataCollection *[]any, writeEntity 
*clusterv1.SendRequest, start *time.Time, identity *streamIdentity) {
        if len(*dataCollection) == 0 {
-               s.metrics.totalStarted.Inc(1, writeEntity.Topic)
+               if s.metrics != nil {
+                       s.metrics.totalStarted.Inc(1, identity.operation, 
identity.group, identity.senderNode, identity.senderRole, identity.senderTier)
+               }
                *start = time.Now()
        }
        *dataCollection = append(*dataCollection, writeEntity.Body)
-       s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
 }
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index a3a56103f..02450ae37 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -438,81 +438,18 @@ func (s *server) SetNodeSchemaStatusRepo(svc 
metadata.Service) {
 type metrics struct {
        totalStarted  meter.Counter
        totalFinished meter.Counter
+       totalLatency  meter.Histogram
        totalErr      meter.Counter
-       totalLatency  meter.Counter
-
-       totalMsgReceived    meter.Counter
-       totalMsgReceivedErr meter.Counter
-       totalMsgSent        meter.Counter
-       totalMsgSentErr     meter.Counter
-
-       // Chunk ordering metrics
-       outOfOrderChunksReceived meter.Counter
-       chunksBuffered           meter.Counter
-       bufferTimeouts           meter.Counter
-       largeGapsRejected        meter.Counter
-       bufferCapacityExceeded   meter.Counter
-       finishSyncErr            meter.Counter
-
-       // Chunked sync saturation metrics
-       activeSyncSessions meter.Gauge
-       reorderBuffered    meter.Gauge
-
-       // Chunked sync outcome metrics
-       chunkedSyncAbortedTotal meter.Counter
-       chunkedSyncFailedParts  meter.Counter
-       chunkedSyncTotalBytes   meter.Counter
-       chunkedSyncDurationSecs meter.Histogram
+       receivedBytes meter.Counter
 }
 
 func newMetrics(factory observability.Factory) *metrics {
+       labels := []string{"operation", "group", "remote_node", "remote_role", 
"remote_tier"}
        return &metrics{
-               totalStarted:        factory.NewCounter("total_started", 
"topic"),
-               totalFinished:       factory.NewCounter("total_finished", 
"topic"),
-               totalErr:            factory.NewCounter("total_err", "topic"),
-               totalLatency:        factory.NewCounter("total_latency", 
"topic"),
-               totalMsgReceived:    factory.NewCounter("total_msg_received", 
"topic"),
-               totalMsgReceivedErr: 
factory.NewCounter("total_msg_received_err", "topic"),
-               totalMsgSent:        factory.NewCounter("total_msg_sent", 
"topic"),
-               totalMsgSentErr:     factory.NewCounter("total_msg_sent_err", 
"topic"),
-
-               // Chunk ordering metrics
-               outOfOrderChunksReceived: 
factory.NewCounter("out_of_order_chunks_received", "topic"),
-               chunksBuffered:           factory.NewCounter("chunks_buffered", 
"topic"),
-               bufferTimeouts:           factory.NewCounter("buffer_timeouts", 
"topic"),
-               largeGapsRejected:        
factory.NewCounter("large_gaps_rejected", "topic"),
-               bufferCapacityExceeded:   
factory.NewCounter("buffer_capacity_exceeded", "topic"),
-               finishSyncErr:            factory.NewCounter("finish_sync_err", 
"topic"),
-
-               // Chunked sync saturation metrics
-               activeSyncSessions: 
factory.NewGauge("chunked_sync_active_sessions", "topic"),
-               reorderBuffered:    
factory.NewGauge("chunk_reorder_buffered_chunks", "topic"),
-
-               // Chunked sync outcome metrics
-               chunkedSyncAbortedTotal: 
factory.NewCounter("chunked_sync_aborted_total", "topic", "reason"),
-               chunkedSyncFailedParts:  
factory.NewCounter("chunked_sync_failed_parts_total", "topic"),
-               chunkedSyncTotalBytes:   
factory.NewCounter("chunked_sync_total_bytes_received", "topic"),
-               chunkedSyncDurationSecs: 
factory.NewHistogram("chunked_sync_duration_seconds", meter.DefBuckets, 
"topic"),
-       }
-}
-
-// updateChunkOrderMetrics updates chunk ordering metrics.
-func (s *server) updateChunkOrderMetrics(event, topic string) {
-       if s.metrics == nil {
-               return // Skip metrics if not initialized (e.g., during tests)
-       }
-       switch event {
-       case "out_of_order_received":
-               s.metrics.outOfOrderChunksReceived.Inc(1, topic)
-       case "chunk_buffered":
-               s.metrics.chunksBuffered.Inc(1, topic)
-       case "buffer_timeout":
-               s.metrics.bufferTimeouts.Inc(1, topic)
-       case "gap_too_large":
-               s.metrics.largeGapsRejected.Inc(1, topic)
-       case "buffer_full":
-               s.metrics.bufferCapacityExceeded.Inc(1, topic)
-       case "finish_sync_err":
-               s.metrics.finishSyncErr.Inc(1, topic)
+               totalStarted:  factory.NewCounter("total_started", labels...),
+               totalFinished: factory.NewCounter("total_finished", labels...),
+               totalLatency:  factory.NewHistogram("total_latency", 
meter.DefBuckets, labels...),
+               totalErr:      factory.NewCounter("total_err", append(labels, 
"error_type")...),
+               receivedBytes: factory.NewCounter("received_bytes", labels...),
        }
 }
diff --git a/banyand/queue/sub/server_metrics_test.go 
b/banyand/queue/sub/server_metrics_test.go
index 121d2f5be..91638af00 100644
--- a/banyand/queue/sub/server_metrics_test.go
+++ b/banyand/queue/sub/server_metrics_test.go
@@ -34,46 +34,20 @@ type noopCounter struct{}
 func (*noopCounter) Inc(_ float64, _ ...string) {}
 func (*noopCounter) Delete(_ ...string) bool    { return true }
 
-type noopGauge struct{}
-
-func (*noopGauge) Set(_ float64, _ ...string) {}
-func (*noopGauge) Add(_ float64, _ ...string) {}
-func (*noopGauge) Delete(_ ...string) bool    { return true }
-
 type noopHistogram struct{}
 
 func (*noopHistogram) Observe(_ float64, _ ...string) {}
 func (*noopHistogram) Delete(_ ...string) bool        { return true }
 
-func newTestMetrics() *metrics {
+func newTestMetrics() *metrics { //nolint:exhaustruct
        c := &noopCounter{}
-       g := &noopGauge{}
        h := &noopHistogram{}
        return &metrics{
                totalStarted:  c,
                totalFinished: c,
+               totalLatency:  h,
                totalErr:      c,
-               totalLatency:  c,
-
-               totalMsgReceived:    c,
-               totalMsgReceivedErr: c,
-               totalMsgSent:        c,
-               totalMsgSentErr:     c,
-
-               outOfOrderChunksReceived: c,
-               chunksBuffered:           c,
-               bufferTimeouts:           c,
-               largeGapsRejected:        c,
-               bufferCapacityExceeded:   c,
-               finishSyncErr:            c,
-
-               activeSyncSessions: g,
-               reorderBuffered:    g,
-
-               chunkedSyncAbortedTotal: c,
-               chunkedSyncFailedParts:  c,
-               chunkedSyncTotalBytes:   c,
-               chunkedSyncDurationSecs: h,
+               receivedBytes: c,
        }
 }
 
@@ -84,16 +58,6 @@ type fakeCounter struct {
 func (f *fakeCounter) Inc(delta float64, _ ...string) { f.total += delta }
 func (*fakeCounter) Delete(_ ...string) bool          { return true }
 
-type fakeGauge struct {
-       value float64
-}
-
-func (g *fakeGauge) Set(v float64, _ ...string)     { g.value = v }
-func (g *fakeGauge) Add(delta float64, _ ...string) { g.value += delta }
-func (*fakeGauge) Delete(_ ...string) bool          { return true }
-func newFakeGauge() meter.Gauge                     { return &fakeGauge{} }
-func getGaugeValue(g meter.Gauge) float64           { return 
g.(*fakeGauge).value }
-
 type fakeHistogram struct {
        count int
 }
@@ -101,65 +65,53 @@ type fakeHistogram struct {
 func (h *fakeHistogram) Observe(_ float64, _ ...string) { h.count++ }
 func (*fakeHistogram) Delete(_ ...string) bool          { return true }
 
-func TestReleaseMetricsReleasesGauges(t *testing.T) {
-       m := &metrics{
-               activeSyncSessions: newFakeGauge(),
-               reorderBuffered:    newFakeGauge(),
-       }
-
-       sess := &syncSession{
-               sessionID: "s1",
-               startTime: time.Now(),
-               metadata:  &clusterv1.SyncMetadata{Topic: 
"v1:stream-part-sync"},
-               chunkBuffer: &chunkBuffer{
-                       chunks: map[uint32]*clusterv1.SyncPartRequest{
-                               2: {ChunkIndex: 2},
-                               3: {ChunkIndex: 3},
-                       },
-               },
-       }
-
-       // simulate start increments
-       m.activeSyncSessions.Add(1, sess.metadata.Topic)
-       m.reorderBuffered.Add(2, sess.metadata.Topic)
-
-       sess.releaseMetrics(m)
+// capturingCounterWithLabels records the exact label slices passed to Inc.
+type capturingCounterWithLabels struct {
+       calls [][]string
+}
 
-       require.Equal(t, float64(0), getGaugeValue(m.activeSyncSessions))
-       require.Equal(t, float64(0), getGaugeValue(m.reorderBuffered))
-       // idempotent
-       sess.releaseMetrics(m)
-       require.Equal(t, float64(0), getGaugeValue(m.activeSyncSessions))
-       require.Equal(t, float64(0), getGaugeValue(m.reorderBuffered))
+func (c *capturingCounterWithLabels) Inc(_ float64, labels ...string) {
+       cp := make([]string, len(labels))
+       copy(cp, labels)
+       c.calls = append(c.calls, cp)
 }
 
-func TestCompletionOutcomeMetricsRecorded(t *testing.T) {
-       logInitErr := logger.Init(logger.Logging{
-               Env:   "dev",
-               Level: "info",
-       })
+func (*capturingCounterWithLabels) Delete(_ ...string) bool { return true }
+
+// TestFileSyncCompletionMetrics verifies that handleCompletion records
+// totalFinished, totalLatency, receivedBytes with the canonical label order
+// (operation, group, remote_node, remote_role, remote_tier).
+func TestFileSyncCompletionMetrics(t *testing.T) {
+       logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
        require.NoError(t, logInitErr)
 
-       topic := "v1:stream-part-sync"
-       totalBytes := float64(10)
+       finished := &fakeCounter{}
+       latency := &fakeHistogram{}
+       receivedB := &fakeCounter{}
+       totalErrC := &fakeCounter{}
 
-       s := &server{
+       s := &server{ //nolint:exhaustruct
                log: logger.GetLogger("test-server-completion-metrics"),
                metrics: &metrics{
-                       activeSyncSessions:      newFakeGauge(),
-                       reorderBuffered:         newFakeGauge(),
-                       chunkedSyncAbortedTotal: &fakeCounter{},
-                       chunkedSyncFailedParts:  &fakeCounter{},
-                       chunkedSyncTotalBytes:   &fakeCounter{},
-                       chunkedSyncDurationSecs: &fakeHistogram{},
+                       totalStarted:  &fakeCounter{},
+                       totalFinished: finished,
+                       totalLatency:  latency,
+                       totalErr:      totalErrC,
+                       receivedBytes: receivedB,
                },
        }
 
-       session := &syncSession{
+       session := &syncSession{ //nolint:exhaustruct
                sessionID: "s1",
                startTime: time.Now().Add(-2 * time.Second),
-               metadata:  &clusterv1.SyncMetadata{Topic: topic},
-               partCtx:   &queue.ChunkedSyncPartContext{},
+               metadata: &clusterv1.SyncMetadata{
+                       Group:      "my-group",
+                       Topic:      "v1:stream-part-sync",
+                       SenderNode: "data-0:17912",
+                       SenderRole: "data",
+                       SenderTier: "hot",
+               },
+               partCtx: &queue.ChunkedSyncPartContext{},
                partsProgress: map[int]*partProgress{
                        0: {totalBytes: 10, receivedBytes: 10, completed: true},
                        1: {totalBytes: 10, receivedBytes: 0, completed: false},
@@ -168,9 +120,6 @@ func TestCompletionOutcomeMetricsRecorded(t *testing.T) {
                chunksReceived: 2,
        }
 
-       // Simulate start increments that would have happened on session 
creation.
-       s.metrics.activeSyncSessions.Add(1, topic)
-
        mockStream := &MockSyncPartStream{}
        req := &clusterv1.SyncPartRequest{
                SessionId:  session.sessionID,
@@ -182,40 +131,122 @@ func TestCompletionOutcomeMetricsRecorded(t *testing.T) {
        handleErr := s.handleCompletion(mockStream, session, req)
        require.NoError(t, handleErr)
 
-       require.Equal(t, totalBytes, 
s.metrics.chunkedSyncTotalBytes.(*fakeCounter).total)
-       require.Equal(t, 1, 
s.metrics.chunkedSyncDurationSecs.(*fakeHistogram).count)
-       require.Equal(t, float64(1), 
s.metrics.chunkedSyncFailedParts.(*fakeCounter).total)
-       require.Equal(t, float64(0), 
getGaugeValue(s.metrics.activeSyncSessions))
+       require.Equal(t, float64(1), finished.total, "totalFinished should be 
incremented once")
+       require.Equal(t, 1, latency.count, "totalLatency should be observed 
once")
+       require.Equal(t, float64(10), receivedB.total, "receivedBytes should 
equal totalReceived")
+       // one failed part
+       require.Equal(t, float64(1), totalErrC.total, "one part_failed error 
should be recorded")
        require.True(t, session.completed)
 }
 
-func TestCleanupPreviousSessionRecordsAborted(t *testing.T) {
-       logInitErr := logger.Init(logger.Logging{
-               Env:   "dev",
-               Level: "info",
-       })
+// TestFileSyncStartedMetrics verifies that startOrSwitchSession records 
totalStarted.
+func TestFileSyncStartedMetrics(t *testing.T) {
+       logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+       require.NoError(t, logInitErr)
+
+       started := &fakeCounter{}
+       s := &server{ //nolint:exhaustruct
+               log: logger.GetLogger("test-server-started-metrics"),
+               metrics: &metrics{
+                       totalStarted:  started,
+                       totalFinished: &fakeCounter{},
+                       totalLatency:  &fakeHistogram{},
+                       totalErr:      &fakeCounter{},
+                       receivedBytes: &fakeCounter{},
+               },
+       }
+
+       req := &clusterv1.SyncPartRequest{ //nolint:exhaustruct
+               SessionId: "s1",
+               Content: &clusterv1.SyncPartRequest_Metadata{
+                       Metadata: &clusterv1.SyncMetadata{
+                               Group:      "g1",
+                               Topic:      "v1:stream-part-sync",
+                               SenderNode: "liaison-0:17912",
+                               SenderRole: "liaison",
+                               SenderTier: "",
+                       },
+               },
+       }
+       _ = s.startOrSwitchSession("s1", req, nil)
+       require.Equal(t, float64(1), started.total)
+}
+
+// TestFileSyncSwitchRecordsErrType verifies that switching sessions records 
error_type=switch.
+func TestFileSyncSwitchRecordsErrType(t *testing.T) {
+       logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
        require.NoError(t, logInitErr)
 
-       topic := "v1:stream-part-sync"
-       s := &server{
-               log: logger.GetLogger("test-server-switch-abort"),
+       errLabels := &capturingCounterWithLabels{}
+       s := &server{ //nolint:exhaustruct
+               log: logger.GetLogger("test-server-switch"),
                metrics: &metrics{
-                       activeSyncSessions:      newFakeGauge(),
-                       reorderBuffered:         newFakeGauge(),
-                       chunkedSyncAbortedTotal: &fakeCounter{},
+                       totalStarted:  &fakeCounter{},
+                       totalFinished: &fakeCounter{},
+                       totalLatency:  &fakeHistogram{},
+                       totalErr:      errLabels,
+                       receivedBytes: &fakeCounter{},
                },
        }
 
-       prev := &syncSession{
+       prev := &syncSession{ //nolint:exhaustruct
                sessionID: "s1",
                startTime: time.Now().Add(-time.Second),
-               metadata:  &clusterv1.SyncMetadata{Topic: topic},
+               metadata: &clusterv1.SyncMetadata{
+                       Group:      "g1",
+                       Topic:      "v1:stream-part-sync",
+                       SenderNode: "data-0:17912",
+                       SenderRole: "data",
+                       SenderTier: "warm",
+               },
        }
-       s.metrics.activeSyncSessions.Add(1, topic)
 
        s.cleanupPreviousSession(prev)
 
-       require.True(t, prev.abortedRecorded)
-       require.Equal(t, float64(1), 
s.metrics.chunkedSyncAbortedTotal.(*fakeCounter).total)
-       require.Equal(t, float64(0), 
getGaugeValue(s.metrics.activeSyncSessions))
+       require.True(t, prev.completed)
+       require.Len(t, errLabels.calls, 1)
+       lastLabel := errLabels.calls[0]
+       // label order: operation, group, remote_node, remote_role, 
remote_tier, error_type
+       require.Equal(t, 
meter.Counter(errLabels).(*capturingCounterWithLabels), errLabels)
+       require.Equal(t, "switch", lastLabel[len(lastLabel)-1], "error_type 
must be 'switch'")
+}
+
+// TestFileSyncOperationLabel verifies the labels for file-sync sessions use 
operation=file-sync.
+func TestFileSyncOperationLabel(t *testing.T) {
+       logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+       require.NoError(t, logInitErr)
+
+       started := &capturingCounterWithLabels{}
+       s := &server{ //nolint:exhaustruct
+               log: logger.GetLogger("test-server-op-label"),
+               metrics: &metrics{
+                       totalStarted:  started,
+                       totalFinished: &fakeCounter{},
+                       totalLatency:  &fakeHistogram{},
+                       totalErr:      &fakeCounter{},
+                       receivedBytes: &fakeCounter{},
+               },
+       }
+
+       req := &clusterv1.SyncPartRequest{ //nolint:exhaustruct
+               SessionId: "s1",
+               Content: &clusterv1.SyncPartRequest_Metadata{
+                       Metadata: &clusterv1.SyncMetadata{
+                               Group:      "grp",
+                               Topic:      "v1:measure-part-sync",
+                               SenderNode: "data-1:17912",
+                               SenderRole: "data",
+                               SenderTier: "cold",
+                       },
+               },
+       }
+       _ = s.startOrSwitchSession("s1", req, nil)
+       require.Len(t, started.calls, 1)
+       labels := started.calls[0]
+       // operation, group, remote_node, remote_role, remote_tier
+       require.Equal(t, "file-sync", labels[0])
+       require.Equal(t, "grp", labels[1])
+       require.Equal(t, "data-1:17912", labels[2])
+       require.Equal(t, "data", labels[3])
+       require.Equal(t, "cold", labels[4])
 }
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index d94664312..37307e327 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -52,15 +52,10 @@ func checkVersionCompatibility(versionInfo 
*clusterv1.VersionInfo) (*clusterv1.V
        serverFileFormatVersion := storage.GetCurrentVersion()
        compatibleFileFormatVersions := storage.GetCompatibleVersions()
 
-       // Check API version compatibility
        apiCompatible := versionInfo.ApiVersion == serverAPIVersion
 
-       // Check file format version compatibility
-       fileFormatCompatible := false
-       if versionInfo.FileFormatVersion == serverFileFormatVersion {
-               fileFormatCompatible = true
-       } else {
-               // Check if client's file format version is in our compatible 
list
+       fileFormatCompatible := versionInfo.FileFormatVersion == 
serverFileFormatVersion
+       if !fileFormatCompatible {
                for _, compatVer := range compatibleFileFormatVersions {
                        if compatVer == versionInfo.FileFormatVersion {
                                fileFormatCompatible = true
@@ -69,7 +64,7 @@ func checkVersionCompatibility(versionInfo 
*clusterv1.VersionInfo) (*clusterv1.V
                }
        }
 
-       versionCompatibility := &clusterv1.VersionCompatibility{
+       vc := &clusterv1.VersionCompatibility{
                ServerApiVersion:            serverAPIVersion,
                SupportedApiVersions:        []string{serverAPIVersion},
                ServerFileFormatVersion:     serverFileFormatVersion,
@@ -78,37 +73,49 @@ func checkVersionCompatibility(versionInfo 
*clusterv1.VersionInfo) (*clusterv1.V
 
        switch {
        case !apiCompatible && !fileFormatCompatible:
-               versionCompatibility.Supported = false
-               versionCompatibility.Reason = fmt.Sprintf("API version %s not 
supported (server: %s) and file format version %s not compatible (server: %s, 
supported: %v)",
+               vc.Supported = false
+               vc.Reason = fmt.Sprintf("API version %s not supported (server: 
%s) and file format version %s not compatible (server: %s, supported: %v)",
                        versionInfo.ApiVersion, serverAPIVersion, 
versionInfo.FileFormatVersion, serverFileFormatVersion, 
compatibleFileFormatVersions)
-               return versionCompatibility, 
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+               return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
        case !apiCompatible:
-               versionCompatibility.Supported = false
-               versionCompatibility.Reason = fmt.Sprintf("API version %s not 
supported (server: %s)", versionInfo.ApiVersion, serverAPIVersion)
-               return versionCompatibility, 
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+               vc.Supported = false
+               vc.Reason = fmt.Sprintf("API version %s not supported (server: 
%s)", versionInfo.ApiVersion, serverAPIVersion)
+               return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
        case !fileFormatCompatible:
-               versionCompatibility.Supported = false
-               versionCompatibility.Reason = fmt.Sprintf("File format version 
%s not compatible (server: %s, supported: %v)",
+               vc.Supported = false
+               vc.Reason = fmt.Sprintf("File format version %s not compatible 
(server: %s, supported: %v)",
                        versionInfo.FileFormatVersion, serverFileFormatVersion, 
compatibleFileFormatVersions)
-               return versionCompatibility, 
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+               return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
        }
 
-       versionCompatibility.Supported = true
-       versionCompatibility.Reason = "Client version compatible with server"
-       return versionCompatibility, modelv1.Status_STATUS_SUCCEED
+       vc.Supported = true
+       vc.Reason = "Client version compatible with server"
+       return vc, modelv1.Status_STATUS_SUCCEED
+}
+
+// streamIdentity captures the per-stream remote sender identity, pinned on 
the first message.
+type streamIdentity struct {
+       senderNode string
+       senderRole string
+       senderTier string
+       group      string
+       operation  string
+       pinned     bool
 }
 
 func (s *server) Send(stream clusterv1.Service_SendServer) error {
        ctx := stream.Context()
        var topic *bus.Topic
-       var m bus.Message
        var dataCollection []any
        start := time.Now()
+       identity := &streamIdentity{}
+
        defer func() {
-               if topic != nil {
-                       s.metrics.totalFinished.Inc(1, topic.String())
-                       s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
topic.String())
+               if topic == nil || !identity.pinned || s.metrics == nil {
+                       return
                }
+               s.metrics.totalFinished.Inc(1, identity.operation, 
identity.group, identity.senderNode, identity.senderRole, identity.senderTier)
+               s.metrics.totalLatency.Observe(time.Since(start).Seconds(), 
identity.operation, identity.group, identity.senderNode, identity.senderRole, 
identity.senderTier)
        }()
        for {
                select {
@@ -118,132 +125,198 @@ func (s *server) Send(stream 
clusterv1.Service_SendServer) error {
                }
                writeEntity, err := stream.Recv()
                if errors.Is(err, io.EOF) {
-                       s.handleEOF(stream, topic, dataCollection, writeEntity)
+                       s.handleEOF(stream, topic, dataCollection, writeEntity, 
identity)
                        return nil
                }
                if err != nil {
                        return s.handleRecvError(err)
                }
 
-               // Check version compatibility on first message received
-               if writeEntity.VersionInfo != nil {
-                       versionCompatibility, status := 
checkVersionCompatibility(writeEntity.VersionInfo)
-                       if status != modelv1.Status_STATUS_SUCCEED {
-                               s.log.Warn().
-                                       Str("client_api_version", 
writeEntity.VersionInfo.ApiVersion).
-                                       Str("client_file_format_version", 
writeEntity.VersionInfo.FileFormatVersion).
-                                       Str("reason", 
versionCompatibility.Reason).
-                                       Msg("version compatibility check 
failed")
-
-                               if errSend := 
stream.Send(&clusterv1.SendResponse{
-                                       MessageId:            
writeEntity.MessageId,
-                                       Status:               status,
-                                       Error:                
versionCompatibility.Reason,
-                                       VersionCompatibility: 
versionCompatibility,
-                               }); errSend != nil {
-                                       s.log.Error().Err(errSend).Msg("failed 
to send version incompatibility response")
-                               }
-                               return fmt.Errorf("version incompatibility: 
%s", versionCompatibility.Reason)
-                       }
+               if versionErr := s.checkVersionAndReply(stream, writeEntity); 
versionErr != nil {
+                       return versionErr
                }
 
-               s.metrics.totalMsgReceived.Inc(1, writeEntity.Topic)
-               if writeEntity.Topic != "" && topic == nil {
-                       t, ok := data.TopicMap[writeEntity.Topic]
-                       if !ok {
-                               s.reply(stream, writeEntity, err, "invalid 
topic")
-                               continue
-                       }
-                       topic = &t
+               topic, err = s.resolveTopic(stream, writeEntity, topic, 
identity)
+               if err != nil {
+                       continue
                }
                if topic == nil {
-                       s.reply(stream, writeEntity, err, "topic is empty")
                        continue
                }
 
-               if reqSupplier, ok := data.TopicRequestMap[*topic]; ok {
-                       req := reqSupplier()
-                       if req == nil {
-                               m = 
bus.NewMessage(bus.MessageID(writeEntity.MessageId), writeEntity.Body)
-                       } else {
-                               if errUnmarshal := 
proto.Unmarshal(writeEntity.Body, req); errUnmarshal != nil {
-                                       s.reply(stream, writeEntity, 
errUnmarshal, "failed to unmarshal message")
-                                       continue
-                               }
-                               m = 
bus.NewMessage(bus.MessageID(writeEntity.MessageId), req)
-                       }
-               } else {
-                       s.reply(stream, writeEntity, err, "unknown topic")
+               s.pinIdentity(identity, writeEntity, *topic)
+
+               m, parseErr := s.parseMessage(stream, writeEntity, *topic, 
identity)
+               if parseErr != nil {
                        continue
                }
+
                if writeEntity.BatchMod {
-                       s.handleBatch(&dataCollection, writeEntity, &start)
+                       s.handleBatch(&dataCollection, writeEntity, &start, 
identity)
                        continue
                }
-               s.metrics.totalStarted.Inc(1, writeEntity.Topic)
-               listeners := s.getListeners(*topic)
-               if len(listeners) == 0 {
-                       s.reply(stream, writeEntity, err, "no listener found")
-                       continue
-               }
-               if len(listeners) > 1 {
-                       logger.Panicf("multiple listeners found for topic %s", 
*topic)
+
+               s.dispatchMessage(stream, writeEntity, *topic, m, identity, 
&start)
+       }
+}
+
+// checkVersionAndReply returns an error if the version is incompatible and 
sends the rejection response.
+func (s *server) checkVersionAndReply(stream clusterv1.Service_SendServer, 
writeEntity *clusterv1.SendRequest) error {
+       if writeEntity.VersionInfo == nil {
+               return nil
+       }
+       vc, versionStatus := checkVersionCompatibility(writeEntity.VersionInfo)
+       if versionStatus == modelv1.Status_STATUS_SUCCEED {
+               return nil
+       }
+       s.log.Warn().
+               Str("client_api_version", writeEntity.VersionInfo.ApiVersion).
+               Str("client_file_format_version", 
writeEntity.VersionInfo.FileFormatVersion).
+               Str("reason", vc.Reason).
+               Msg("version compatibility check failed")
+       if errSend := stream.Send(&clusterv1.SendResponse{
+               MessageId:            writeEntity.MessageId,
+               Status:               versionStatus,
+               Error:                vc.Reason,
+               VersionCompatibility: vc,
+       }); errSend != nil {
+               s.log.Error().Err(errSend).Msg("failed to send version 
incompatibility response")
+       }
+       return fmt.Errorf("version incompatibility: %s", vc.Reason)
+}
+
+// resolveTopic parses and validates the topic from the write entity.
+// Returns a non-nil error (sentinel) to signal the caller to `continue`.
+func (s *server) resolveTopic(
+       stream clusterv1.Service_SendServer,
+       writeEntity *clusterv1.SendRequest,
+       current *bus.Topic,
+       identity *streamIdentity,
+) (*bus.Topic, error) {
+       if writeEntity.Topic == "" || current != nil {
+               if current == nil {
+                       s.replyWithErrType(stream, writeEntity, nil, "topic is 
empty", identity, "empty_topic")
+                       return nil, fmt.Errorf("empty")
                }
-               listener := listeners[0]
+               return current, nil
+       }
+       t, ok := data.TopicMap[writeEntity.Topic]
+       if !ok {
+               s.replyWithErrType(stream, writeEntity, nil, "invalid topic", 
identity, "invalid_topic")
+               return nil, fmt.Errorf("invalid")
+       }
+       return &t, nil
+}
 
-               m = listener.Rev(ctx, m)
-               if m.Data() == nil {
-                       if errSend := stream.Send(&clusterv1.SendResponse{
-                               MessageId: writeEntity.MessageId,
-                       }); errSend != nil {
-                               s.log.Error().Stringer("request", 
writeEntity).Err(errSend).Msg("failed to send empty response")
-                               s.metrics.totalMsgSentErr.Inc(1, 
writeEntity.Topic)
-                               continue
+// pinIdentity pins the sender identity on the first message.
+func (s *server) pinIdentity(identity *streamIdentity, writeEntity 
*clusterv1.SendRequest, topic bus.Topic) {
+       if !identity.pinned {
+               identity.senderNode = writeEntity.GetSenderNode()
+               identity.senderRole = writeEntity.GetSenderRole()
+               identity.senderTier = writeEntity.GetSenderTier()
+               identity.operation = data.OperationOf(topic)
+               identity.group = writeEntity.GetGroup()
+               identity.pinned = true
+               return
+       }
+       if g := writeEntity.GetGroup(); g != "" {
+               identity.group = g
+       }
+}
+
+// parseMessage deserializes the wire body into a bus.Message. Returns a 
sentinel error to `continue` on failure.
+func (s *server) parseMessage(
+       stream clusterv1.Service_SendServer,
+       writeEntity *clusterv1.SendRequest,
+       topic bus.Topic,
+       identity *streamIdentity,
+) (bus.Message, error) {
+       reqSupplier, ok := data.TopicRequestMap[topic]
+       if !ok {
+               s.replyWithErrType(stream, writeEntity, nil, "unknown topic", 
identity, "unknown_topic")
+               return bus.Message{}, fmt.Errorf("unknown")
+       }
+       req := reqSupplier()
+       if req == nil {
+               return bus.NewMessage(bus.MessageID(writeEntity.MessageId), 
writeEntity.Body), nil
+       }
+       if errUnmarshal := proto.Unmarshal(writeEntity.Body, req); errUnmarshal 
!= nil {
+               s.replyWithErrType(stream, writeEntity, errUnmarshal, "failed 
to unmarshal message", identity, "unmarshal_error")
+               return bus.Message{}, errUnmarshal
+       }
+       return bus.NewMessage(bus.MessageID(writeEntity.MessageId), req), nil
+}
+
+// dispatchMessage invokes the listener and sends back the response.
+func (s *server) dispatchMessage(
+       stream clusterv1.Service_SendServer,
+       writeEntity *clusterv1.SendRequest,
+       topic bus.Topic,
+       m bus.Message,
+       identity *streamIdentity,
+       start *time.Time,
+) {
+       if s.metrics != nil {
+               s.metrics.totalStarted.Inc(1, identity.operation, 
identity.group, identity.senderNode, identity.senderRole, identity.senderTier)
+       }
+       listeners := s.getListeners(topic)
+       if len(listeners) == 0 {
+               s.replyWithErrType(stream, writeEntity, nil, "no listener 
found", identity, "no_listener")
+               return
+       }
+       if len(listeners) > 1 {
+               logger.Panicf("multiple listeners found for topic %s", topic)
+       }
+       m = listeners[0].Rev(stream.Context(), m)
+       if m.Data() == nil {
+               if errSend := stream.Send(&clusterv1.SendResponse{MessageId: 
writeEntity.MessageId}); errSend != nil {
+                       s.log.Error().Stringer("request", 
writeEntity).Err(errSend).Msg("failed to send empty response")
+                       if s.metrics != nil {
+                               s.metrics.totalErr.Inc(1, identity.operation, 
identity.group, identity.senderNode, identity.senderRole, identity.senderTier, 
"send_error")
                        }
-                       s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
-                       continue
                }
-               var responseBody []byte
-               switch d := m.Data().(type) {
-               case proto.Message:
-                       var marshalErr error
-                       responseBody, marshalErr = proto.Marshal(d)
-                       if marshalErr != nil {
-                               s.reply(stream, writeEntity, marshalErr, 
"failed to marshal message")
-                               continue
-                       }
-               case []byte:
-                       // Topic-AND-process-wire-mode guard: an opaque []byte 
response body
-                       // is the raw vec columnar frame and is only valid on
-                       // TopicInternalMeasureQuery when this process is 
flag-on. The
-                       // default: arm still rejects []byte for every other 
topic/mode.
-                       if *topic != data.TopicInternalMeasureQuery || 
!data.MeasureWireModeRaw() {
-                               s.reply(stream, writeEntity, nil, 
fmt.Sprintf("invalid response: unexpected raw body on topic %s", *topic))
-                               continue
-                       }
-                       responseBody = d
-               case *common.Error:
-                       select {
-                       case <-ctx.Done():
-                               s.metrics.totalMsgReceivedErr.Inc(1, 
writeEntity.Topic)
-                               return ctx.Err()
-                       default:
-                       }
-                       s.reply(stream, writeEntity, nil, d.Error())
-                       continue
-               default:
-                       s.reply(stream, writeEntity, nil, fmt.Sprintf("invalid 
response: %T", d))
-                       continue
+               return
+       }
+       responseBody, marshalErr := s.marshalResponse(stream, writeEntity, 
topic, m, identity)
+       if marshalErr != nil {
+               return
+       }
+       if sendErr := stream.Send(&clusterv1.SendResponse{MessageId: 
writeEntity.MessageId, Body: responseBody}); sendErr != nil {
+               s.log.Error().Stringer("request", writeEntity).Dur("latency", 
time.Since(*start)).Err(sendErr).Msg("failed to send query response")
+               if s.metrics != nil {
+                       s.metrics.totalErr.Inc(1, identity.operation, 
identity.group, identity.senderNode, identity.senderRole, identity.senderTier, 
"send_error")
                }
-               if err := stream.Send(&clusterv1.SendResponse{
-                       MessageId: writeEntity.MessageId,
-                       Body:      responseBody,
-               }); err != nil {
-                       s.log.Error().Stringer("request", 
writeEntity).Dur("latency", time.Since(start)).Err(err).Msg("failed to send 
query response")
-                       s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
-                       continue
+       }
+}
+
+// marshalResponse converts a bus.Message data to a wire body. Returns 
sentinel error to stop dispatch on failure.
+func (s *server) marshalResponse(
+       stream clusterv1.Service_SendServer,
+       writeEntity *clusterv1.SendRequest,
+       topic bus.Topic,
+       m bus.Message,
+       identity *streamIdentity,
+) ([]byte, error) {
+       switch d := m.Data().(type) {
+       case proto.Message:
+               body, marshalErr := proto.Marshal(d)
+               if marshalErr != nil {
+                       s.replyWithErrType(stream, writeEntity, marshalErr, 
"failed to marshal message", identity, "marshal_error")
+                       return nil, marshalErr
+               }
+               return body, nil
+       case []byte:
+               if topic != data.TopicInternalMeasureQuery || 
!data.MeasureWireModeRaw() {
+                       s.replyWithErrType(stream, writeEntity, nil, 
fmt.Sprintf("invalid response: unexpected raw body on topic %s", topic), 
identity, "marshal_error")
+                       return nil, fmt.Errorf("invalid raw body")
                }
-               s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
+               return d, nil
+       case *common.Error:
+               s.replyWithErrType(stream, writeEntity, nil, d.Error(), 
identity, "handler_error")
+               return nil, fmt.Errorf("handler error")
+       default:
+               s.replyWithErrType(stream, writeEntity, nil, 
fmt.Sprintf("invalid response: %T", d), identity, "handler_error")
+               return nil, fmt.Errorf("invalid response type")
        }
 }
 
@@ -269,25 +342,26 @@ func (s *server) getListeners(topic bus.Topic) 
[]bus.MessageListener {
        return s.listeners[topic]
 }
 
-func (s *server) reply(stream clusterv1.Service_SendServer, writeEntity 
*clusterv1.SendRequest, err error, message string) {
+func (s *server) replyWithErrType(
+       stream clusterv1.Service_SendServer,
+       writeEntity *clusterv1.SendRequest,
+       err error,
+       message string,
+       identity *streamIdentity,
+       errType string,
+) {
        s.log.Error().Stringer("request", writeEntity).Err(err).Msg(message)
-       s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic)
-       resp := &clusterv1.SendResponse{
-               MessageId: writeEntity.MessageId,
+       if s.metrics != nil {
+               s.metrics.totalErr.Inc(1, identity.operation, identity.group, 
identity.senderNode, identity.senderRole, identity.senderTier, errType)
        }
-
-       var ce *common.Error
-       if errors.As(err, &ce) {
-               resp.Error = ce.Error()
-               resp.Status = ce.Status()
-       } else {
-               resp.Error = message
+       var msgID uint64
+       if writeEntity != nil {
+               msgID = writeEntity.MessageId
        }
        if errResp := stream.Send(&clusterv1.SendResponse{
-               MessageId: writeEntity.MessageId,
+               MessageId: msgID,
                Error:     message,
        }); errResp != nil {
                s.log.Error().Err(errResp).AnErr("original", 
err).Stringer("request", writeEntity).Msg("failed to send error response")
-               s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
        }
 }
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 0090d8438..ff5fd5760 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2487,6 +2487,10 @@ PartResult contains the result for individual parts.
 | body | [bytes](#bytes) |  |  |
 | batch_mod | [bool](#bool) |  |  |
 | version_info | [VersionInfo](#banyandb-cluster-v1-VersionInfo) |  | 
version_info contains version information |
+| group | [string](#string) |  | group is the business group associated with 
this message. |
+| sender_node | [string](#string) |  | sender_node is the BanyanDB node name 
of the sender; set only on the first message of a stream. |
+| sender_role | [string](#string) |  | sender_role is the role of the sender 
node; set only on the first message of a stream. |
+| sender_tier | [string](#string) |  | sender_tier is the storage tier label 
of the sender node; set only on the first message of a stream. |
 
 
 
@@ -2542,6 +2546,9 @@ SyncMetadata contains metadata for the sync operation.
 | topic | [string](#string) |  | Sync topic (stream-part-sync or 
measure-part-sync). |
 | timestamp | [int64](#int64) |  | Timestamp when sync started. |
 | total_parts | [uint32](#uint32) |  | Total number of parts being synced. |
+| sender_node | [string](#string) |  | sender_node is the BanyanDB node name 
of the sender. |
+| sender_role | [string](#string) |  | sender_role is the role of the sender 
node. |
+| sender_tier | [string](#string) |  | sender_tier is the storage tier label 
of the sender node. |
 
 
 
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 154360277..a261e74b7 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -162,6 +162,15 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                        if err != nil {
                                return err
                        }
+                       // Stamp the liaison's own identity onto the cluster 
publishers so the
+                       // receiving data nodes can label queue metrics by 
sender (remote_node/
+                       // role/tier), enabling the liaison<->data topology to 
be drawn.
+                       liaisonTier := node.Labels["type"]
+                       for _, c := range []queue.Client{tire1Client, 
tire2Client} {
+                               if setter, ok := c.(interface{ 
SetSelfNode(string, string, string) }); ok {
+                                       setter.SetSelfNode(node.NodeID, 
"liaison", liaisonTier)
+                               }
+                       }
                        logger.GetLogger().Info().Msg("starting as a liaison 
server")
                        runCtx = context.WithValue(runCtx, 
common.ContextNodeKey, node)
                        // Spawn our go routines and wait for shutdown.

Reply via email to