This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 32c5c7d83 feat: queue_pub observability (#1113)
32c5c7d83 is described below

commit 32c5c7d83d59e43c001c4b9abc817e51c36ba2d3
Author: OmCheeLin <[email protected]>
AuthorDate: Mon May 11 15:02:20 2026 +0800

    feat: queue_pub observability (#1113)
---
 CHANGES.md                          |   1 +
 banyand/metadata/client.go          |   5 +
 banyand/metadata/metadata.go        |   1 +
 banyand/metadata/service/server.go  |  11 +
 banyand/queue/pub/batch.go          | 163 +++++++++---
 banyand/queue/pub/metrics_test.go   | 499 ++++++++++++++++++++++++++++++++++++
 banyand/queue/pub/pub.go            |  45 ++++
 banyand/queue/pub/retry_test.go     |  18 +-
 docs/operation/grafana-cluster.json | 445 +++++++++++++++++++++++++++++++-
 docs/operation/observability.md     |  60 ++++-
 10 files changed, 1200 insertions(+), 48 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index c714597ea..efa30e4b4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -51,6 +51,7 @@ Release Notes.
   - Repair the `GetMaxRevision` aggregation on the per-node 
`NodeSchemaStatusService` (`banyand/metadata/schema/property/node_status.go`). 
The previous implementation returned `min(schemaCache.notifiedModRevision, 
NodeRepoRegistry.LatestModRevision)`, but `LatestModRevision` aggregated 
per-service `schemaRepo` watermarks via `min` — and each `schemaRepo` only 
advances on events for its own catalog (`pkg/schema/init.go:72` filters by 
`g.Catalog`), so the min was perpetually pinned to the  [...]
   - Fix the `notifiedModRevision` watermark advancement in 
`SchemaRegistry.processInitialResourceFromProperty`, `handleWatchEvent` (DELETE 
branch), and `handleDeletion`. Previously `AdvanceNotified` was gated on 
`cache.Update` / `cache.Delete` returning true, but those methods compare 
`latestUpdateAt` (property timestamp) while the watermark tracks `modRevision` 
(etcd revision). When the property timestamp is stale (e.g. a no-op Update that 
doesn't change the measure spec), the cache rej [...]
   - Fix the `modRevision` contract on no-op Update RPCs 
(`MeasureRegistryService.Update`, etc.). Previously `updateResource` detected 
unchanged content via `CheckerMap` and short-circuited without writing to the 
property store, but the caller had already fabricated `modRevision = 
time.Now().UnixNano()` and returned it. The returned revision never appeared in 
the property watch stream, so `AwaitRevisionApplied(R)` would hang. 
`updateResource` now returns `(int64, error)` — the existing pr [...]
+  - Add end-to-end observability for liaison internal queue pipelines with 
per-topic metrics for queue_sub and queue_pub, along with Grafana panels and 
troubleshooting docs.
 
 ### Bug Fixes
 
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 6e66ea1ef..9a7b8bb10 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -421,6 +421,11 @@ func (s *clientService) SetMetricsRegistry(omr 
observability.MetricsRegistry) {
        s.omr = omr
 }
 
+// MetricsRegistry returns the metrics registry set via SetMetricsRegistry.
+func (s *clientService) MetricsRegistry() observability.MetricsRegistry {
+       return s.omr
+}
+
 func (s *clientService) Name() string {
        return "metadata"
 }
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index a6e8e6719..f26631964 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -73,6 +73,7 @@ type Service interface {
        // Service.
        NodeRepoRegistry() *registry.NodeRepoRegistry
        SetMetricsRegistry(omr observability.MetricsRegistry)
+       MetricsRegistry() observability.MetricsRegistry
        SetDataBroadcaster(broadcaster bus.Broadcaster)
        SetLiaisonBroadcaster(broadcaster bus.Broadcaster)
        RegisterDataCollector(catalog commonv1.Catalog, collector 
schema.DataInfoCollector)
diff --git a/banyand/metadata/service/server.go 
b/banyand/metadata/service/server.go
index 0d23f3050..30ed3f612 100644
--- a/banyand/metadata/service/server.go
+++ b/banyand/metadata/service/server.go
@@ -192,6 +192,17 @@ func (s *server) SetMetricsRegistry(omr 
observability.MetricsRegistry) {
        s.Service.SetMetricsRegistry(omr)
 }
 
+// MetricsRegistry returns the metrics registry held by this server or the 
embedded client.
+func (s *server) MetricsRegistry() observability.MetricsRegistry {
+       if s.omr != nil {
+               return s.omr
+       }
+       if s.Service != nil {
+               return s.Service.MetricsRegistry()
+       }
+       return nil
+}
+
 // GetSchemaServerPort returns the schema server gRPC port.
 func (s *server) GetSchemaServerPort() *uint32 {
        if s.propServer != nil {
diff --git a/banyand/queue/pub/batch.go b/banyand/queue/pub/batch.go
index b755d300a..005b7633f 100644
--- a/banyand/queue/pub/batch.go
+++ b/banyand/queue/pub/batch.go
@@ -40,6 +40,17 @@ const (
        defaultPerRequestTimeout = 2 * time.Second
        defaultBackoffBase       = 500 * time.Millisecond
        defaultBackoffMax        = 30 * time.Second
+
+       // Local send-side failures (before the frame leaves the publisher).
+       sendErrReasonNonTransient   = "non_transient"
+       sendErrReasonCanceled       = "canceled"
+       sendErrReasonStreamCanceled = "stream_canceled"
+       sendErrReasonRetryExhausted = "retry_exhausted"
+       // Remote-side failures (observed after the frame was written to the 
stream).
+       sendErrReasonRecvError      = "recv_error"      // s.Recv returned an 
error (connection/protocol layer).
+       sendErrReasonServerRejected = "server_rejected" // Server responded 
with a non-empty Error (includes failover statuses).
+
+       sendResultSuccess = "success"
 )
 
 type writeStream struct {
@@ -85,13 +96,22 @@ func (bp *batchPublisher) Publish(ctx context.Context, 
topic bus.Topic, messages
                        continue
                }
 
+               topicStr := topic.String()
                sendData := func() (success bool) {
                        if stream, ok := bp.streams[node]; ok {
+                               hasMetrics := bp.hasMetrics()
                                defer func() {
                                        if !success {
                                                delete(bp.streams, node)
+                                               if hasMetrics {
+                                                       
bp.pub.metrics.inflightStreams.Add(-1, node)
+                                               }
                                        }
                                }()
+                               if hasMetrics {
+                                       bp.pub.metrics.inflightRequests.Add(1, 
topicStr, node)
+                                       defer 
bp.pub.metrics.inflightRequests.Add(-1, topicStr, node)
+                               }
                                select {
                                case <-ctx.Done():
                                        return false
@@ -99,7 +119,7 @@ func (bp *batchPublisher) Publish(ctx context.Context, topic 
bus.Topic, messages
                                        return false
                                default:
                                }
-                               errSend := bp.retrySend(ctx, stream.client, r, 
node)
+                               errSend := bp.retrySend(ctx, stream.client, r, 
node, topicStr)
                                if errSend != nil {
                                        err = multierr.Append(err, 
fmt.Errorf("failed to send message to node %s: %w", node, errSend))
                                        // Record failure for circuit breaker 
(only for transient/internal errors)
@@ -155,6 +175,10 @@ func (bp *batchPublisher) Publish(ctx context.Context, 
topic bus.Topic, messages
                deferFn := cancel
                stream, errCreateStream := nodeClient.client.Send(streamCtx)
                if errCreateStream != nil {
+                       // Release the timeout context when Send() fails; 
otherwise listenBatchResponse,
+                       // which normally invokes deferFn on exit, is never 
spawned and streamCtx leaks
+                       // until bp.timeout expires, accumulating timer 
goroutines under repeated failures.
+                       cancel()
                        err = multierr.Append(err, fmt.Errorf("failed to get 
stream for node %s: %w", node, errCreateStream))
                        continue
                }
@@ -162,48 +186,70 @@ func (bp *batchPublisher) Publish(ctx context.Context, 
topic bus.Topic, messages
                        client:    stream,
                        ctxDoneCh: streamCtx.Done(),
                }
+               if bp.hasMetrics() {
+                       bp.pub.metrics.inflightStreams.Add(1, node)
+               }
                bp.f.events = append(bp.f.events, make(chan batchEvent))
                _ = sendData()
                nodeName := node
-               go func(s clusterv1.Service_SendClient, deferFn func(), bc chan 
batchEvent, curNode string) {
-                       defer func() {
-                               close(bc)
-                               deferFn()
-                       }()
-                       select {
-                       case <-ctx.Done():
-                               return
-                       default:
-                       }
-                       resp, errRecv := s.Recv()
-                       if errRecv != nil {
-                               if grpchelper.IsFailoverError(errRecv) {
-                                       // Record circuit breaker failure 
before creating failover event
-                                       bp.pub.connMgr.RecordFailure(curNode, 
errRecv)
-                                       bc <- batchEvent{n: curNode, e: 
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, 
errRecv.Error())}
-                               }
-                               return
-                       }
-                       if resp == nil {
-                               return
-                       }
-                       if resp.Error == "" {
-                               return
-                       }
-                       if isFailoverStatus(resp.Status) {
-                               ce := common.NewErrorWithStatus(resp.Status, 
resp.Error)
-                               // Record circuit breaker failure before 
creating failover event
-                               bp.pub.connMgr.RecordFailure(curNode, ce)
-                               bc <- batchEvent{n: curNode, e: ce}
-                       }
-               }(stream, deferFn, bp.f.events[len(bp.f.events)-1], nodeName)
+               go bp.listenBatchResponse(ctx, stream, deferFn, 
bp.f.events[len(bp.f.events)-1], nodeName, topicStr)
        }
        return nil, err
 }
 
+func (bp *batchPublisher) hasMetrics() bool {
+       return bp.pub != nil && bp.pub.metrics != nil
+}
+
+// listenBatchResponse receives the server response and records failover 
events and end-to-end failure metrics.
+func (bp *batchPublisher) listenBatchResponse(ctx context.Context, s 
clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent, curNode, 
topic string) {
+       defer func() {
+               close(bc)
+               deferFn()
+       }()
+       select {
+       case <-ctx.Done():
+               return
+       default:
+       }
+
+       resp, errRecv := s.Recv()
+       if errRecv != nil {
+               if bp.hasMetrics() {
+                       bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode, 
sendErrReasonRecvError)
+               }
+               if grpchelper.IsFailoverError(errRecv) {
+                       // Record circuit breaker failure before creating 
failover event
+                       bp.pub.connMgr.RecordFailure(curNode, errRecv)
+                       bc <- batchEvent{n: curNode, e: 
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, 
errRecv.Error())}
+               }
+               return
+       }
+       if resp == nil || resp.Error == "" || resp.Status == 
modelv1.Status_STATUS_SUCCEED {
+               return
+       }
+       if bp.hasMetrics() {
+               bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode, 
sendErrReasonServerRejected)
+       }
+       ce := common.NewErrorWithStatus(resp.Status, resp.Error)
+       // Only failover statuses trigger circuit-breaker accounting; other 
server-side
+       // rejections (e.g. invalid argument) are surfaced to the caller but do 
not count
+       // toward node health.
+       if isFailoverStatus(resp.Status) {
+               bp.pub.connMgr.RecordFailure(curNode, ce)
+       }
+       // Always surface a batchEvent for any non-empty resp.Error so that 
Close() exposes
+       // the rejection to the caller. Otherwise the 
send_err_total{reason="server_rejected"}
+       // counter would rise on dashboards while callers silently treat the 
batch as successful.
+       bc <- batchEvent{n: curNode, e: ce}
+}
+
 func (bp *batchPublisher) Close() (cee map[string]*common.Error, err error) {
-       for i := range bp.streams {
-               err = multierr.Append(err, bp.streams[i].client.CloseSend())
+       for nodeName, stream := range bp.streams {
+               err = multierr.Append(err, stream.client.CloseSend())
+               if bp.hasMetrics() {
+                       bp.pub.metrics.inflightStreams.Add(-1, nodeName)
+               }
        }
        for i := range bp.streams {
                <-bp.streams[i].ctxDoneCh
@@ -280,8 +326,14 @@ func isFailoverStatus(s modelv1.Status) bool {
 }
 
 // retrySend implements bounded retries for client streaming sends with 
exponential backoff and jitter.
-func (bp *batchPublisher) retrySend(ctx context.Context, stream 
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string) error {
+func (bp *batchPublisher) retrySend(ctx context.Context, stream 
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string, topic 
string) error {
        var lastErr error
+       start := time.Now()
+       observeDuration := func(result string) {
+               if bp.hasMetrics() {
+                       
bp.pub.metrics.sendDurationSeconds.Observe(time.Since(start).Seconds(), topic, 
node, result)
+               }
+       }
 
        for attempt := 0; attempt <= defaultMaxRetries; attempt++ {
                // Create per-attempt timeout context
@@ -291,9 +343,17 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                select {
                case <-ctx.Done():
                        cancel()
+                       if bp.hasMetrics() {
+                               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonCanceled)
+                       }
+                       observeDuration(sendErrReasonCanceled)
                        return ctx.Err()
                case <-stream.Context().Done():
                        cancel()
+                       if bp.hasMetrics() {
+                               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonStreamCanceled)
+                       }
+                       observeDuration(sendErrReasonStreamCanceled)
                        return stream.Context().Err()
                case <-attemptCtx.Done():
                        cancel()
@@ -310,7 +370,12 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                cancel()
 
                if sendErr == nil {
-                       // Success
+                       if bp.hasMetrics() {
+                               bp.pub.metrics.sendSuccessTotal.Inc(1, topic, 
node)
+                               
bp.pub.metrics.sendBytesTotal.Inc(float64(len(r.Body)), topic, node)
+                       }
+                       // Success writing to the local stream; end-to-end ack 
is observed in listenBatchResponse.
+                       observeDuration(sendResultSuccess)
                        return nil
                }
 
@@ -319,9 +384,17 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                // Check if error is retryable
                if !grpchelper.IsTransientError(sendErr) {
                        // Non-transient error, don't retry
+                       if bp.hasMetrics() {
+                               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonNonTransient)
+                       }
+                       observeDuration(sendErrReasonNonTransient)
                        return sendErr
                }
 
+               if bp.hasMetrics() {
+                       bp.pub.metrics.sendRetryAttempts.Inc(1, topic, node)
+               }
+
                // If this was the last attempt, don't sleep
                if attempt >= defaultMaxRetries {
                        break
@@ -334,13 +407,29 @@ func (bp *batchPublisher) retrySend(ctx context.Context, 
stream clusterv1.Servic
                select {
                case <-time.After(backoff):
                        // Continue to next attempt
+                       if bp.hasMetrics() {
+                               
bp.pub.metrics.sendBackoffSeconds.Inc(backoff.Seconds(), topic, node)
+                       }
                case <-ctx.Done():
+                       if bp.hasMetrics() {
+                               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonCanceled)
+                       }
+                       observeDuration(sendErrReasonCanceled)
                        return ctx.Err()
                case <-stream.Context().Done():
+                       if bp.hasMetrics() {
+                               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonStreamCanceled)
+                       }
+                       observeDuration(sendErrReasonStreamCanceled)
                        return stream.Context().Err()
                }
        }
 
        // All retries exhausted
+       if bp.hasMetrics() {
+               bp.pub.metrics.sendRetryExhausted.Inc(1, topic, node)
+               bp.pub.metrics.sendErrTotal.Inc(1, topic, node, 
sendErrReasonRetryExhausted)
+       }
+       observeDuration(sendErrReasonRetryExhausted)
        return fmt.Errorf("retry exhausted for node %s after %d attempts, last 
error: %w", node, defaultMaxRetries+1, lastErr)
 }
diff --git a/banyand/queue/pub/metrics_test.go 
b/banyand/queue/pub/metrics_test.go
new file mode 100644
index 000000000..b326bf88a
--- /dev/null
+++ b/banyand/queue/pub/metrics_test.go
@@ -0,0 +1,499 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "context"
+       "strings"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+type fakeSendClient struct {
+       clusterv1.Service_SendClient
+       ctx context.Context
+
+       sendErrs []error
+       sendIdx  int
+}
+
+func (f *fakeSendClient) Send(_ *clusterv1.SendRequest) error {
+       if f.sendIdx >= len(f.sendErrs) {
+               return nil
+       }
+       err := f.sendErrs[f.sendIdx]
+       f.sendIdx++
+       return err
+}
+
+func (f *fakeSendClient) Context() context.Context {
+       return f.ctx
+}
+
+type countingCounter struct {
+       count float64
+}
+
+func (c *countingCounter) Inc(delta float64, _ ...string) {
+       c.count += delta
+}
+
+func (*countingCounter) Delete(_ ...string) bool {
+       return true
+}
+
+// errReasonCapturer records send_err_total increments keyed by the `reason` 
label (third label).
+type errReasonCapturer struct {
+       byReason map[string]float64
+       mu       sync.Mutex
+}
+
+func newErrReasonCapturer() *errReasonCapturer {
+       return &errReasonCapturer{byReason: make(map[string]float64)}
+}
+
+func (c *errReasonCapturer) Inc(delta float64, labels ...string) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if len(labels) < 3 {
+               return
+       }
+       reason := labels[2]
+       c.byReason[reason] += delta
+}
+
+func (c *errReasonCapturer) Delete(_ ...string) bool {
+       return true
+}
+
+func (c *errReasonCapturer) sum(reason string) float64 {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       return c.byReason[reason]
+}
+
+type noopGauge struct{}
+
+func (*noopGauge) Set(_ float64, _ ...string) {}
+func (*noopGauge) Add(_ float64, _ ...string) {}
+func (*noopGauge) Delete(_ ...string) bool    { return true }
+
+type valGauge struct {
+       mu  sync.Mutex
+       val float64
+}
+
+func (g *valGauge) Add(delta float64, _ ...string) {
+       g.mu.Lock()
+       g.val += delta
+       g.mu.Unlock()
+}
+
+func (g *valGauge) Set(v float64, _ ...string) {
+       g.mu.Lock()
+       g.val = v
+       g.mu.Unlock()
+}
+
+func (*valGauge) Delete(_ ...string) bool { return true }
+
+func (g *valGauge) value() float64 {
+       g.mu.Lock()
+       defer g.mu.Unlock()
+       return g.val
+}
+
+// inflightKeyedGauge tracks inflight_requests Add deltas per (topic, node) 
label pair.
+type inflightKeyedGauge struct {
+       vals map[string]float64
+       mu   sync.Mutex
+}
+
+func newInflightKeyedGauge() *inflightKeyedGauge {
+       return &inflightKeyedGauge{vals: make(map[string]float64)}
+}
+
+func inflightReqKey(topic, node string) string {
+       return strings.Join([]string{topic, node}, "|")
+}
+
+func (g *inflightKeyedGauge) Add(delta float64, labels ...string) {
+       if len(labels) < 2 {
+               return
+       }
+       k := inflightReqKey(labels[0], labels[1])
+       g.mu.Lock()
+       g.vals[k] += delta
+       g.mu.Unlock()
+}
+
+func (*inflightKeyedGauge) Set(_ float64, _ ...string) {}
+
+func (*inflightKeyedGauge) Delete(_ ...string) bool {
+       return true
+}
+
+func (g *inflightKeyedGauge) net(topic, node string) float64 {
+       g.mu.Lock()
+       defer g.mu.Unlock()
+       return g.vals[inflightReqKey(topic, node)]
+}
+
+type noopHistogram struct{}
+
+func (*noopHistogram) Observe(_ float64, _ ...string) {}
+func (*noopHistogram) Delete(_ ...string) bool        { return true }
+
+func newPubMetricsWithErrCapture(sendErr *errReasonCapturer) *pubMetrics {
+       return &pubMetrics{
+               sendSuccessTotal:    &countingCounter{},
+               sendErrTotal:        sendErr,
+               sendBytesTotal:      &countingCounter{},
+               sendDurationSeconds: &noopHistogram{},
+               sendRetryAttempts:   &countingCounter{},
+               sendRetryExhausted:  &countingCounter{},
+               sendBackoffSeconds:  &countingCounter{},
+               inflightStreams:     &noopGauge{},
+               inflightRequests:    &noopGauge{},
+       }
+}
+
+func newPubWithConnMgrForMetrics(t *testing.T, pm *pubMetrics) *pub {
+       t.Helper()
+       p := &pub{
+               handlers: make(map[bus.Topic]schema.EventHandler),
+               log:      logger.GetLogger("queue-pub-metrics-test"),
+               metrics:  pm,
+               closer:   run.NewCloser(1),
+       }
+       p.connMgr = 
grpchelper.NewConnManager(grpchelper.ConnManagerConfig[*client]{
+               Handler:        p,
+               Logger:         p.log,
+               RetryPolicy:    "",
+               MaxRecvMsgSize: 4 << 20,
+       })
+       return p
+}
+
+// TestRetryMetrics ensures that retry-related metrics are updated when 
retrySend
+// observes retryable errors and eventually exhausts retries.
+func TestRetryMetrics(t *testing.T) {
+       sendErrCap := newErrReasonCapturer()
+       p := &pub{
+               metrics: &pubMetrics{
+                       sendRetryAttempts:   &countingCounter{},
+                       sendRetryExhausted:  &countingCounter{},
+                       sendErrTotal:        sendErrCap,
+                       sendBackoffSeconds:  &countingCounter{},
+                       sendSuccessTotal:    &countingCounter{},
+                       sendBytesTotal:      &countingCounter{},
+                       sendDurationSeconds: &noopHistogram{},
+                       inflightStreams:     &noopGauge{},
+                       inflightRequests:    &noopGauge{},
+               },
+       }
+
+       bp := &batchPublisher{
+               pub: p,
+       }
+
+       ctx := context.Background()
+       client := &fakeSendClient{
+               sendErrs: []error{
+                       status.Error(codes.Unavailable, "transient"),
+                       status.Error(codes.Unavailable, "transient"),
+                       status.Error(codes.Unavailable, "transient"),
+                       status.Error(codes.Unavailable, "transient"),
+               },
+               ctx: ctx,
+       }
+
+       req := &clusterv1.SendRequest{
+               Topic: "test-topic",
+               Body:  []byte("payload"),
+       }
+
+       const nodeName = "test-node"
+       const topicStr = "test-topic"
+
+       retryErr := bp.retrySend(ctx, client, req, nodeName, topicStr)
+       require.Error(t, retryErr)
+
+       retries := p.metrics.sendRetryAttempts.(*countingCounter).count
+       require.Equal(t, float64(4), retries, "one retry counter per transient 
failure before exhaustion")
+
+       exhausted := p.metrics.sendRetryExhausted.(*countingCounter).count
+       require.Equal(t, float64(1), exhausted)
+
+       require.Equal(t, float64(1), 
sendErrCap.sum(sendErrReasonRetryExhausted))
+       require.Equal(t, float64(0), sendErrCap.sum(sendErrReasonNonTransient))
+
+       backoff := p.metrics.sendBackoffSeconds.(*countingCounter).count
+       require.Greater(t, backoff, float64(0), "backoff should be recorded 
between retry attempts")
+}
+
+func TestRetrySendNonTransientRecordsReason(t *testing.T) {
+       sendErrCap := newErrReasonCapturer()
+       p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
+       bp := &batchPublisher{pub: p}
+
+       ctx := context.Background()
+       mockStream := NewMockSendClient(ctx)
+       mockStream.SetSendFunc(func(*clusterv1.SendRequest) error {
+               return status.Error(codes.InvalidArgument, "non-transient")
+       })
+
+       err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1", 
"t1")
+       require.Error(t, err)
+
+       require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonNonTransient))
+       require.Equal(t, float64(0), 
sendErrCap.sum(sendErrReasonRetryExhausted))
+       require.Equal(t, float64(0), 
p.metrics.sendRetryAttempts.(*countingCounter).count)
+}
+
+func TestRetrySendCanceledRecordsReason(t *testing.T) {
+       sendErrCap := newErrReasonCapturer()
+       p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
+       bp := &batchPublisher{pub: p}
+
+       ctx, cancel := context.WithCancel(context.Background())
+       cancel()
+
+       mockStream := NewMockSendClient(context.Background())
+       mockStream.SetSendFunc(func(*clusterv1.SendRequest) error {
+               return status.Error(codes.Unavailable, "unavailable")
+       })
+
+       err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1", 
"t1")
+       require.ErrorIs(t, err, context.Canceled)
+
+       require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonCanceled))
+}
+
+func TestRetrySendStreamCanceledRecordsReason(t *testing.T) {
+       sendErrCap := newErrReasonCapturer()
+       p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
+       bp := &batchPublisher{pub: p}
+
+       streamCtx, cancel := context.WithCancel(context.Background())
+       cancel()
+
+       mockStream := NewMockSendClient(streamCtx)
+
+       err := bp.retrySend(context.Background(), mockStream, 
&clusterv1.SendRequest{}, "n1", "t1")
+       require.ErrorIs(t, err, context.Canceled)
+
+       require.Equal(t, float64(1), 
sendErrCap.sum(sendErrReasonStreamCanceled))
+}
+
+func TestListenBatchResponseRecordsRecvError(t *testing.T) {
+       sendErrCap := newErrReasonCapturer()
+       pm := newPubMetricsWithErrCapture(sendErrCap)
+       p := newPubWithConnMgrForMetrics(t, pm)
+       bp := &batchPublisher{pub: p}
+
+       ctx := context.Background()
+       mockStream := NewMockSendClient(ctx)
+       mockStream.SetRecvFunc(func() (*clusterv1.SendResponse, error) {
+               return nil, status.Error(codes.Unavailable, "recv failed")
+       })
+
+       bc := make(chan batchEvent, 1)
+       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a", 
"topic-a")
+
+       require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonRecvError))
+}
+
+func TestListenBatchResponseRecvNonFailoverStillRecordsRecvError(t *testing.T) 
{
+       sendErrCap := newErrReasonCapturer()
+       pm := newPubMetricsWithErrCapture(sendErrCap)
+       p := newPubWithConnMgrForMetrics(t, pm)
+       bp := &batchPublisher{pub: p}
+
+       ctx := context.Background()
+       mockStream := NewMockSendClient(ctx)
+       mockStream.SetRecvFunc(func() (*clusterv1.SendResponse, error) {
+               return nil, status.Error(codes.InvalidArgument, "bad")
+       })
+
+       bc := make(chan batchEvent, 1)
+       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a", 
"topic-a")
+
+       require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonRecvError))
+}
+
+func TestListenBatchResponseServerRejectedWithoutFailover(t *testing.T) {
+       sendErrCap := newErrReasonCapturer()
+       pm := newPubMetricsWithErrCapture(sendErrCap)
+       p := newPubWithConnMgrForMetrics(t, pm)
+       bp := &batchPublisher{pub: p}
+
+       ctx := context.Background()
+       mockStream := NewMockSendClient(ctx)
+       mockStream.SetRecvFunc(func() (*clusterv1.SendResponse, error) {
+               return &clusterv1.SendResponse{
+                       Error:  "rejected",
+                       Status: modelv1.Status_STATUS_INTERNAL_ERROR,
+               }, nil
+       })
+
+       bc := make(chan batchEvent, 1)
+       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a", 
"topic-a")
+
+       require.Equal(t, float64(1), 
sendErrCap.sum(sendErrReasonServerRejected))
+       require.Equal(t, float64(0), sendErrCap.sum(sendErrReasonRecvError))
+
+       // Non-failover server rejections are now surfaced to the caller via 
batchEvent
+       // so that send_err_total{reason="server_rejected"} stays consistent 
with visible errors.
+       // The event must carry the rejection but must NOT trigger a 
circuit-breaker failover.
+       select {
+       case evt, ok := <-bc:
+               require.True(t, ok, "expected a batchEvent for non-failover 
server rejection")
+               require.Equal(t, "node-a", evt.n)
+               require.NotNil(t, evt.e)
+               require.Equal(t, modelv1.Status_STATUS_INTERNAL_ERROR, 
evt.e.Status())
+       default:
+               t.Fatal("expected batchEvent for server_rejected but channel 
was empty")
+       }
+}
+
+func TestListenBatchResponseDiskFullSendsFailoverEvent(t *testing.T) {
+       sendErrCap := newErrReasonCapturer()
+       pm := newPubMetricsWithErrCapture(sendErrCap)
+       p := newPubWithConnMgrForMetrics(t, pm)
+       bp := &batchPublisher{pub: p}
+
+       ctx := context.Background()
+       mockStream := NewMockSendClient(ctx)
+       mockStream.SetRecvFunc(func() (*clusterv1.SendResponse, error) {
+               return &clusterv1.SendResponse{
+                       Error:  "disk full",
+                       Status: modelv1.Status_STATUS_DISK_FULL,
+               }, nil
+       })
+
+       bc := make(chan batchEvent, 1)
+       bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a", 
"topic-a")
+
+       require.Equal(t, float64(1), 
sendErrCap.sum(sendErrReasonServerRejected))
+
+       select {
+       case evt := <-bc:
+               require.Equal(t, "node-a", evt.n)
+               require.NotNil(t, evt.e)
+               require.Equal(t, modelv1.Status_STATUS_DISK_FULL, 
evt.e.Status())
+       default:
+               t.Fatal("expected failover batchEvent on disk full response")
+       }
+}
+
+func TestCloseDecrementsInflightStreams(t *testing.T) {
+       streamGauge := &valGauge{}
+       pm := &pubMetrics{
+               sendSuccessTotal:    &countingCounter{},
+               sendErrTotal:        newErrReasonCapturer(),
+               sendBytesTotal:      &countingCounter{},
+               sendDurationSeconds: &noopHistogram{},
+               sendRetryAttempts:   &countingCounter{},
+               sendRetryExhausted:  &countingCounter{},
+               sendBackoffSeconds:  &countingCounter{},
+               inflightStreams:     streamGauge,
+               inflightRequests:    &noopGauge{},
+       }
+       p := newPubWithConnMgrForMetrics(t, pm)
+
+       ctx := context.Background()
+       mockStream := NewMockSendClient(ctx)
+       doneCh := make(chan struct{})
+       close(doneCh)
+
+       bp := p.NewBatchPublisher(time.Second).(*batchPublisher)
+       bp.streams["node-x"] = writeStream{
+               client:    mockStream,
+               ctxDoneCh: doneCh,
+       }
+
+       streamGauge.Add(1, "node-x")
+
+       _, closeErr := bp.Close()
+       require.NoError(t, closeErr)
+       require.Equal(t, float64(0), streamGauge.value(), "Close should 
decrement inflight_streams once per open stream")
+}
+
+// TestPublishInflightRequestsBalancedWhenStreamPreexists checks that Publish 
increments then decrements
+// inflight_requests (via defer) when reusing an existing stream, without 
requiring GetClient / new stream setup.
+func TestPublishInflightRequestsBalancedWhenStreamPreexists(t *testing.T) {
+       inflightReq := newInflightKeyedGauge()
+       sendErrCap := newErrReasonCapturer()
+       attempts := &countingCounter{}
+       pm := &pubMetrics{
+               sendSuccessTotal:    attempts,
+               sendErrTotal:        sendErrCap,
+               sendBytesTotal:      &countingCounter{},
+               sendDurationSeconds: &noopHistogram{},
+               sendRetryAttempts:   &countingCounter{},
+               sendRetryExhausted:  &countingCounter{},
+               sendBackoffSeconds:  &countingCounter{},
+               inflightStreams:     &noopGauge{},
+               inflightRequests:    inflightReq,
+       }
+       p := newPubWithConnMgrForMetrics(t, pm)
+
+       ctx := context.Background()
+       mockStream := NewMockSendClient(ctx)
+       mockStream.SetSendFunc(func(*clusterv1.SendRequest) error {
+               return nil
+       })
+
+       doneCh := make(chan struct{})
+       close(doneCh)
+
+       const nodeName = "node-a"
+       topic := data.TopicMeasureWrite
+       topicStr := topic.String()
+
+       bp := p.NewBatchPublisher(10 * time.Second).(*batchPublisher)
+       bp.streams[nodeName] = writeStream{
+               client:    mockStream,
+               ctxDoneCh: doneCh,
+       }
+
+       msg := bus.NewMessageWithNode(1, nodeName, []byte("payload"))
+
+       _, publishErr := bp.Publish(ctx, topic, msg)
+       require.NoError(t, publishErr)
+
+       require.Equal(t, float64(1), attempts.count, "successful retrySend 
should record send_success_total")
+       require.Equal(t, float64(0), inflightReq.net(topicStr, nodeName),
+               "inflight_requests defer must balance +1/-1 for topic/node")
+       require.Equal(t, float64(0), 
sendErrCap.sum(sendErrReasonRetryExhausted))
+}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 1217009b4..d552bce77 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -42,14 +42,18 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/run"
        pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
 )
 
+var queuePubScope = observability.RootScope.SubScope("queue_pub")
+
 // ChunkedSyncClientConfig configures chunked sync client behavior.
 type ChunkedSyncClientConfig struct {
        ChunkSize        uint32        // Size of each chunk in bytes
@@ -71,6 +75,7 @@ type pub struct {
        metadata        metadata.Repo
        handlers        map[bus.Topic]schema.EventHandler
        log             *logger.Logger
+       metrics         *pubMetrics
        connMgr         *grpchelper.ConnManager[*client]
        closer          *run.Closer
        writableProbe   map[string]map[string]struct{}
@@ -83,6 +88,34 @@ type pub struct {
        tlsEnabled      bool
 }
 
+type pubMetrics struct {
+       sendSuccessTotal    meter.Counter
+       sendErrTotal        meter.Counter
+       sendBytesTotal      meter.Counter
+       sendDurationSeconds meter.Histogram
+
+       sendRetryAttempts  meter.Counter
+       sendRetryExhausted meter.Counter
+       sendBackoffSeconds meter.Counter
+       inflightStreams    meter.Gauge
+       inflightRequests   meter.Gauge
+}
+
+func newPubMetrics(factory observability.Factory) *pubMetrics {
+       return &pubMetrics{
+               sendSuccessTotal:    factory.NewCounter("send_success_total", 
"topic", "node"),
+               sendErrTotal:        factory.NewCounter("send_err_total", 
"topic", "node", "reason"),
+               sendBytesTotal:      factory.NewCounter("send_bytes_total", 
"topic", "node"),
+               sendDurationSeconds: 
factory.NewHistogram("send_duration_seconds", meter.DefBuckets, "topic", 
"node", "result"),
+
+               sendRetryAttempts:  
factory.NewCounter("send_retry_attempts_total", "topic", "node"),
+               sendRetryExhausted: 
factory.NewCounter("send_retry_exhausted_total", "topic", "node"),
+               sendBackoffSeconds: 
factory.NewCounter("send_backoff_seconds_total", "topic", "node"),
+               inflightStreams:    factory.NewGauge("inflight_streams", 
"node"),
+               inflightRequests:   factory.NewGauge("inflight_requests", 
"topic", "node"),
+       }
+}
+
 // AddressOf implements grpchelper.ConnectionHandler.
 func (p *pub) AddressOf(node *databasev1.Node) string {
        return node.GrpcAddress
@@ -387,6 +420,18 @@ func (p *pub) PreRun(context.Context) error {
 
        p.log = logger.GetLogger("server-queue-pub-" + p.prefix)
 
+       if p.metrics == nil && p.metadata != nil {
+               if svc, ok := p.metadata.(metadata.Service); ok {
+                       if omr := svc.MetricsRegistry(); omr != nil {
+                               p.metrics = 
newPubMetrics(omr.With(queuePubScope))
+                       } else {
+                               p.log.Warn().Msg("queue_pub metrics disabled: 
MetricsRegistry returned nil")
+                       }
+               } else {
+                       p.log.Warn().Msg("queue_pub metrics disabled: metadata 
does not implement metadata.Service")
+               }
+       }
+
        // Initialize connection manager with the pub as the handler
        p.connMgr = 
grpchelper.NewConnManager(grpchelper.ConnManagerConfig[*client]{ 
//nolint:contextcheck // health check runs in background goroutine
                Handler:        p,
diff --git a/banyand/queue/pub/retry_test.go b/banyand/queue/pub/retry_test.go
index b073c3470..8293f90ee 100644
--- a/banyand/queue/pub/retry_test.go
+++ b/banyand/queue/pub/retry_test.go
@@ -109,7 +109,7 @@ func TestRetrySendSuccess(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node")
+       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
 
        assert.NoError(t, err, "successful send should not return error")
 }
@@ -131,7 +131,7 @@ func TestRetrySendTransientErrorWithRecovery(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node")
+       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
 
        assert.NoError(t, err, "should succeed after retry")
 }
@@ -149,7 +149,7 @@ func TestRetrySendNonTransientError(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node")
+       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
 
        assert.Error(t, err, "non-transient error should be returned 
immediately")
        assert.Equal(t, nonTransientErr, err, "should return the original 
error")
@@ -168,7 +168,7 @@ func TestRetrySendExhaustedRetries(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node")
+       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
 
        assert.Error(t, err, "should return error after exhausting retries")
        assert.Contains(t, err.Error(), "retry exhausted", "error should 
indicate retry exhaustion")
@@ -190,7 +190,7 @@ func TestRetrySendContextCancellation(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node")
+       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
 
        assert.Error(t, err, "should return error when context is canceled")
        assert.Equal(t, context.Canceled, err, "should return context 
cancellation error")
@@ -208,7 +208,7 @@ func TestRetrySendStreamContextDone(t *testing.T) {
        bp := &batchPublisher{}
        req := &clusterv1.SendRequest{}
 
-       err := bp.retrySend(ctx, mockStream, req, "test-node")
+       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
 
        assert.Error(t, err, "should return error when stream context is done")
        assert.Equal(t, context.Canceled, err, "should return stream context 
cancellation error")
@@ -228,7 +228,7 @@ func TestRetrySendPerAttemptTimeout(t *testing.T) {
        req := &clusterv1.SendRequest{}
 
        start := time.Now()
-       _ = bp.retrySend(ctx, mockStream, req, "test-node")
+       _ = bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
        duration := time.Since(start)
 
        // Should timeout quickly due to per-attempt timeout, not wait for the 
full operation
@@ -259,7 +259,7 @@ func TestRetrySendBackoffTiming(t *testing.T) {
        req := &clusterv1.SendRequest{}
 
        start := time.Now()
-       err := bp.retrySend(ctx, mockStream, req, "test-node")
+       err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
        duration := time.Since(start)
 
        assert.NoError(t, err, "should eventually succeed")
@@ -302,7 +302,7 @@ func TestRetrySendConcurrency(t *testing.T) {
                        bp := &batchPublisher{}
                        req := &clusterv1.SendRequest{}
 
-                       err := bp.retrySend(ctx, mockStream, req, 
fmt.Sprintf("test-node-%d", id))
+                       err := bp.retrySend(ctx, mockStream, req, 
fmt.Sprintf("test-node-%d", id), "test-topic")
                        errors <- err
                }(i)
        }
diff --git a/docs/operation/grafana-cluster.json 
b/docs/operation/grafana-cluster.json
index 6c2df9309..e9b58581a 100644
--- a/docs/operation/grafana-cluster.json
+++ b/docs/operation/grafana-cluster.json
@@ -2524,6 +2524,449 @@
       ],
       "title": "Total Documents",
       "type": "timeseries"
+    },
+    {
+      "collapsed": false,
+      "gridPos": {
+        "h": 1,
+        "w": 24,
+        "x": 0,
+        "y": 64
+      },
+      "id": 30,
+      "panels": [],
+      "title": "Liaison internal queue ($pod)",
+      "type": "row"
+    },
+    {
+      "datasource": {
+        "type": "prometheus",
+        "uid": "${DS_PROMETHEUS}"
+      },
+      "description": "Internal queue_sub chunked sync pressure: active 
sessions and reorder-buffer depth (labels use topic, not session).",
+      "fieldConfig": {
+        "defaults": {
+          "color": {
+            "mode": "palette-classic"
+          },
+          "custom": {
+            "axisBorderShow": false,
+            "axisCenteredZero": false,
+            "axisColorMode": "text",
+            "axisLabel": "",
+            "axisPlacement": "auto",
+            "barAlignment": 0,
+            "barWidthFactor": 0.6,
+            "drawStyle": "line",
+            "fillOpacity": 0,
+            "gradientMode": "none",
+            "hideFrom": {
+              "legend": false,
+              "tooltip": false,
+              "viz": false
+            },
+            "insertNulls": false,
+            "lineInterpolation": "linear",
+            "lineWidth": 1,
+            "pointSize": 5,
+            "scaleDistribution": {
+              "type": "linear"
+            },
+            "showPoints": "auto",
+            "spanNulls": false,
+            "stacking": {
+              "group": "A",
+              "mode": "none"
+            },
+            "thresholdsStyle": {
+              "mode": "off"
+            }
+          },
+          "mappings": [],
+          "thresholds": {
+            "mode": "absolute",
+            "steps": [
+              {
+                "color": "green",
+                "value": null
+              }
+            ]
+          },
+          "unit": "short"
+        },
+        "overrides": []
+      },
+      "gridPos": {
+        "h": 8,
+        "w": 12,
+        "x": 0,
+        "y": 65
+      },
+      "id": 31,
+      "options": {
+        "legend": {
+          "calcs": [
+            "lastNotNull",
+            "mean",
+            "max"
+          ],
+          "displayMode": "table",
+          "placement": "bottom",
+          "showLegend": true,
+          "sortBy": "Last *",
+          "sortDesc": true
+        },
+        "tooltip": {
+          "mode": "single",
+          "sort": "none"
+        }
+      },
+      "targets": [
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "editorMode": "code",
+          "expr": 
"sum(banyandb_queue_sub_chunked_sync_active_sessions{job=~\"$job\",pod=~\"$pod\"})
 by (topic)",
+          "hide": false,
+          "instant": false,
+          "legendFormat": "active_sessions {{topic}}",
+          "range": true,
+          "refId": "A"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "editorMode": "code",
+          "expr": 
"sum(banyandb_queue_sub_chunk_reorder_buffered_chunks{job=~\"$job\",pod=~\"$pod\"})
 by (topic)",
+          "hide": false,
+          "instant": false,
+          "legendFormat": "reorder_buffered {{topic}}",
+          "range": true,
+          "refId": "B"
+        }
+      ],
+      "title": "queue_sub: sync sessions & reorder buffer",
+      "type": "timeseries"
+    },
+    {
+      "datasource": {
+        "type": "prometheus",
+        "uid": "${DS_PROMETHEUS}"
+      },
+      "description": "Chunked sync abort rate by topic and reason (switch, 
stream_error, ctx_done, eof).",
+      "fieldConfig": {
+        "defaults": {
+          "color": {
+            "mode": "palette-classic"
+          },
+          "custom": {
+            "axisBorderShow": false,
+            "axisCenteredZero": false,
+            "axisColorMode": "text",
+            "axisLabel": "",
+            "axisPlacement": "auto",
+            "barAlignment": 0,
+            "barWidthFactor": 0.6,
+            "drawStyle": "line",
+            "fillOpacity": 0,
+            "gradientMode": "none",
+            "hideFrom": {
+              "legend": false,
+              "tooltip": false,
+              "viz": false
+            },
+            "insertNulls": false,
+            "lineInterpolation": "linear",
+            "lineWidth": 1,
+            "pointSize": 5,
+            "scaleDistribution": {
+              "type": "linear"
+            },
+            "showPoints": "auto",
+            "spanNulls": false,
+            "stacking": {
+              "group": "A",
+              "mode": "none"
+            },
+            "thresholdsStyle": {
+              "mode": "off"
+            }
+          },
+          "mappings": [],
+          "thresholds": {
+            "mode": "absolute",
+            "steps": [
+              {
+                "color": "green",
+                "value": null
+              }
+            ]
+          },
+          "unit": "ops"
+        },
+        "overrides": []
+      },
+      "gridPos": {
+        "h": 8,
+        "w": 12,
+        "x": 12,
+        "y": 65
+      },
+      "id": 32,
+      "options": {
+        "legend": {
+          "calcs": [
+            "lastNotNull",
+            "mean",
+            "max"
+          ],
+          "displayMode": "table",
+          "placement": "bottom",
+          "showLegend": true,
+          "sortBy": "Last *",
+          "sortDesc": true
+        },
+        "tooltip": {
+          "mode": "single",
+          "sort": "none"
+        }
+      },
+      "targets": [
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "editorMode": "code",
+          "expr": 
"sum(rate(banyandb_queue_sub_chunked_sync_aborted_total{job=~\"$job\",pod=~\"$pod\"}[$__rate_interval]))
 by (topic,reason)",
+          "hide": false,
+          "instant": false,
+          "legendFormat": "{{topic}} {{reason}}",
+          "range": true,
+          "refId": "A"
+        }
+      ],
+      "title": "queue_sub: chunked sync abort rate",
+      "type": "timeseries"
+    },
+    {
+      "datasource": {
+        "type": "prometheus",
+        "uid": "${DS_PROMETHEUS}"
+      },
+      "description": "queue_pub successful stream sends per second 
(tier-1/tier-2 batch client).",
+      "fieldConfig": {
+        "defaults": {
+          "color": {
+            "mode": "palette-classic"
+          },
+          "custom": {
+            "axisBorderShow": false,
+            "axisCenteredZero": false,
+            "axisColorMode": "text",
+            "axisLabel": "",
+            "axisPlacement": "auto",
+            "barAlignment": 0,
+            "barWidthFactor": 0.6,
+            "drawStyle": "line",
+            "fillOpacity": 0,
+            "gradientMode": "none",
+            "hideFrom": {
+              "legend": false,
+              "tooltip": false,
+              "viz": false
+            },
+            "insertNulls": false,
+            "lineInterpolation": "linear",
+            "lineWidth": 1,
+            "pointSize": 5,
+            "scaleDistribution": {
+              "type": "linear"
+            },
+            "showPoints": "auto",
+            "spanNulls": false,
+            "stacking": {
+              "group": "A",
+              "mode": "none"
+            },
+            "thresholdsStyle": {
+              "mode": "off"
+            }
+          },
+          "mappings": [],
+          "thresholds": {
+            "mode": "absolute",
+            "steps": [
+              {
+                "color": "green",
+                "value": null
+              }
+            ]
+          },
+          "unit": "ops"
+        },
+        "overrides": []
+      },
+      "gridPos": {
+        "h": 8,
+        "w": 12,
+        "x": 0,
+        "y": 73
+      },
+      "id": 33,
+      "options": {
+        "legend": {
+          "calcs": [
+            "lastNotNull",
+            "mean",
+            "max"
+          ],
+          "displayMode": "table",
+          "placement": "bottom",
+          "showLegend": true,
+          "sortBy": "Last *",
+          "sortDesc": true
+        },
+        "tooltip": {
+          "mode": "single",
+          "sort": "none"
+        }
+      },
+      "targets": [
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "editorMode": "code",
+          "expr": 
"sum(rate(banyandb_queue_pub_send_success_total{job=~\"$job\",pod=~\"$pod\"}[$__rate_interval]))
 by (topic)",
+          "hide": false,
+          "instant": false,
+          "legendFormat": "{{topic}}",
+          "range": true,
+          "refId": "A"
+        }
+      ],
+      "title": "queue_pub: send success rate",
+      "type": "timeseries"
+    },
+    {
+      "datasource": {
+        "type": "prometheus",
+        "uid": "${DS_PROMETHEUS}"
+      },
+      "description": "queue_pub errors and exhausted retries (see 
docs/operation/observability.md for reason values).",
+      "fieldConfig": {
+        "defaults": {
+          "color": {
+            "mode": "palette-classic"
+          },
+          "custom": {
+            "axisBorderShow": false,
+            "axisCenteredZero": false,
+            "axisColorMode": "text",
+            "axisLabel": "",
+            "axisPlacement": "auto",
+            "barAlignment": 0,
+            "barWidthFactor": 0.6,
+            "drawStyle": "line",
+            "fillOpacity": 0,
+            "gradientMode": "none",
+            "hideFrom": {
+              "legend": false,
+              "tooltip": false,
+              "viz": false
+            },
+            "insertNulls": false,
+            "lineInterpolation": "linear",
+            "lineWidth": 1,
+            "pointSize": 5,
+            "scaleDistribution": {
+              "type": "linear"
+            },
+            "showPoints": "auto",
+            "spanNulls": false,
+            "stacking": {
+              "group": "A",
+              "mode": "none"
+            },
+            "thresholdsStyle": {
+              "mode": "off"
+            }
+          },
+          "mappings": [],
+          "thresholds": {
+            "mode": "absolute",
+            "steps": [
+              {
+                "color": "green",
+                "value": null
+              }
+            ]
+          },
+          "unit": "ops"
+        },
+        "overrides": []
+      },
+      "gridPos": {
+        "h": 8,
+        "w": 12,
+        "x": 12,
+        "y": 73
+      },
+      "id": 34,
+      "options": {
+        "legend": {
+          "calcs": [
+            "lastNotNull",
+            "mean",
+            "max"
+          ],
+          "displayMode": "table",
+          "placement": "bottom",
+          "showLegend": true,
+          "sortBy": "Last *",
+          "sortDesc": true
+        },
+        "tooltip": {
+          "mode": "single",
+          "sort": "none"
+        }
+      },
+      "targets": [
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "editorMode": "code",
+          "expr": 
"sum(rate(banyandb_queue_pub_send_err_total{job=~\"$job\",pod=~\"$pod\"}[$__rate_interval]))
 by (reason)",
+          "hide": false,
+          "instant": false,
+          "legendFormat": "err {{reason}}",
+          "range": true,
+          "refId": "A"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "editorMode": "code",
+          "expr": 
"sum(rate(banyandb_queue_pub_send_retry_exhausted_total{job=~\"$job\",pod=~\"$pod\"}[$__rate_interval]))
 by (topic,node)",
+          "hide": false,
+          "instant": false,
+          "legendFormat": "retry_exhausted {{topic}} {{node}}",
+          "range": true,
+          "refId": "B"
+        }
+      ],
+      "title": "queue_pub: errors & retry exhausted",
+      "type": "timeseries"
     }
   ],
   "schemaVersion": 39,
@@ -2587,6 +3030,6 @@
   "timezone": "browser",
   "title": "BanyanDB Cluster",
   "uid": "ddy81kbj931mof",
-  "version": 19,
+  "version": 20,
   "weekStart": ""
 }
\ No newline at end of file
diff --git a/docs/operation/observability.md b/docs/operation/observability.md
index a8935c748..559ea053d 100644
--- a/docs/operation/observability.md
+++ b/docs/operation/observability.md
@@ -258,6 +258,64 @@ If the value is too large, it may indicate that too many 
data points are being i
 
 **Expression**: 
`sum(banyandb_stream_tst_inverted_index_total_doc_count{job=~\"$job\",instance=~\"$instance\"})
 by (group)`
 
+### Liaison internal queue (`queue_sub` / `queue_pub`)
+
+Liaison nodes run an internal gRPC **queue server** (`server-queue-sub`, wired 
via `sub.NewServerWithPorts` in `pkg/cmdsetup/liaison.go`) and **queue 
clients** (`server-queue-pub`) for tier-1/tier-2 pipelines. Prometheus metrics 
use the namespaces `banyandb_queue_sub_*` and `banyandb_queue_pub_*` (built 
from `observability.RootScope` + `queue_sub` / `queue_pub` sub-scopes). Data 
nodes may expose the same metric families where the corresponding services run.
+
+#### `queue_sub` — inbound server (including chunked sync)
+
+| Metric (suffix after `banyandb_queue_sub_`) | Type | Labels | Meaning |
+| --- | --- | --- | --- |
+| `total_started`, `total_finished`, `total_err`, `total_latency` | Counter | 
`topic` | Legacy per-topic stream handler lifecycle. |
+| `total_msg_received`, `total_msg_received_err`, `total_msg_sent`, 
`total_msg_sent_err` | Counter | `topic` | Per-topic message I/O errors 
(included in high-level error rates below). |
+| `out_of_order_chunks_received`, `chunks_buffered` | Counter | `topic` | 
Chunk reordering: out-of-order arrivals and buffer events (**`topic` only**, 
not per session). |
+| `buffer_timeouts`, `large_gaps_rejected`, `buffer_capacity_exceeded`, 
`finish_sync_err` | Counter | `topic` | Reorder buffer pressure and sync 
completion issues. |
+| `chunked_sync_active_sessions` | Gauge | `topic` | In-flight chunked sync 
sessions per topic. |
+| `chunk_reorder_buffered_chunks` | Gauge | `topic` | Chunks waiting in the 
reorder buffer. |
+| `chunked_sync_aborted_total` | Counter | `topic`, `reason` | Aborted 
sessions; `reason` is one of `switch`, `stream_error`, `ctx_done`, `eof`. |
+| `chunked_sync_failed_parts_total` | Counter | `topic` | Parts incomplete 
when a sync completes. |
+| `chunked_sync_total_bytes_received` | Counter | `topic` | Bytes received for 
completed syncs. |
+| `chunked_sync_duration_seconds` | Histogram | `topic` | Wall-clock duration 
of completed syncs. |
+
+**Troubleshooting:** rising `chunk_reorder_buffered_chunks` or 
`buffer_timeouts` suggests sustained out-of-order or slow consumers. Spikes in 
`chunked_sync_aborted_total` with `reason=switch` often correlate with 
topic/hand-off changes; `stream_error` / `ctx_done` / `eof` point to RPC 
lifecycle issues. Use `chunked_sync_failed_parts_total` and the duration 
histogram to separate partial completion from healthy throughput.
+
+#### `queue_pub` — outbound batch client
+
+| Metric (suffix after `banyandb_queue_pub_`) | Type | Labels | Meaning |
+| --- | --- | --- | --- |
+| `send_success_total` | Counter | `topic`, `node` | Successful `Send` on the 
client stream (local write, not end-to-end ack). |
+| `send_bytes_total` | Counter | `topic`, `node` | Payload bytes on successful 
`Send`. |
+| `send_duration_seconds` | Histogram | `topic`, `node`, `result` | Time spent 
in the send path including retries. `result` is one of `success`, 
`non_transient`, `canceled`, `stream_canceled`, `retry_exhausted`; filter to 
`result="success"` (and optionally `retry_exhausted`) when isolating end-to-end 
send latency. |
+| `send_err_total` | Counter | `topic`, `node`, `reason` | Send/recv side 
errors; `reason` includes `non_transient`, `canceled`, `stream_canceled`, 
`retry_exhausted`, `recv_error`, `server_rejected`. |
+| `send_retry_attempts_total`, `send_retry_exhausted_total`, 
`send_backoff_seconds_total` | Counter | `topic`, `node` | Retry/backoff 
behavior before giving up. |
+| `inflight_streams` | Gauge | `node` | Open send streams per downstream node. 
|
+| `inflight_requests` | Gauge | `topic`, `node` | In-flight batch send 
operations. |
+
+**Troubleshooting:** correlate `send_retry_exhausted_total` and 
`send_err_total{reason="retry_exhausted"}` with upstream pressure. `recv_error` 
vs `server_rejected` separates transport failures from application-level 
`SendResponse` errors. Sustained high `inflight_requests` or `inflight_streams` 
may indicate slow or unavailable data nodes.
+
+Metrics are only registered when `metadata` implements `metadata.Service` and 
`MetricsRegistry()` is non-nil (e.g. after `SetMetricsRegistry` in bootstrap). 
`NewWithoutMetadata()` leaves `queue_pub` metrics disabled without a warning.
+
+#### Example PromQL snippets
+
+Saturation (liaison pod / job variables as in the cluster dashboard):
+
+- **Chunked sync sessions:** 
`sum(banyandb_queue_sub_chunked_sync_active_sessions{job=~"$job",pod=~"$pod"}) 
by (topic)`
+- **Reorder buffer depth:** 
`sum(banyandb_queue_sub_chunk_reorder_buffered_chunks{job=~"$job",pod=~"$pod"}) 
by (topic)`
+- **Chunked sync abort rate:** 
`sum(rate(banyandb_queue_sub_chunked_sync_aborted_total{job=~"$job",pod=~"$pod"}[$__rate_interval]))
 by (topic,reason)`
+- **Publisher success rate:** 
`sum(rate(banyandb_queue_pub_send_success_total{job=~"$job",pod=~"$pod"}[$__rate_interval]))
 by (topic)`
+- **Publisher errors by reason:** 
`sum(rate(banyandb_queue_pub_send_err_total{job=~"$job",pod=~"$pod"}[$__rate_interval]))
 by (reason)`
+
+**Suggested alerts (tune thresholds per cluster):**
+
+- Non-zero sustained `rate(banyandb_queue_pub_send_retry_exhausted_total[5m])` 
on liaison.
+- `chunk_reorder_buffered_chunks` or `chunked_sync_active_sessions` above an 
environment-specific ceiling for a single `topic`.
+
+#### Aggregate pipeline error rate (optional)
+
+To combine legacy queue stream errors with publisher-side failures (per minute 
scaling as elsewhere in this doc):
+
+**Expression**: 
`sum(rate(banyandb_queue_sub_total_msg_sent_err{job=~"$job",instance=~"$instance"}[$__rate_interval])*60)
 + 
sum(rate(banyandb_queue_sub_total_msg_received_err{job=~"$job",instance=~"$instance"}[$__rate_interval])*60)
 + 
sum(rate(banyandb_queue_pub_send_err_total{job=~"$job",instance=~"$instance"}[$__rate_interval])*60)`
+
 ## Metrics Providers
 
 BanyanDB has built-in support for metrics collection. Currently, there are two 
supported metrics provider: `prometheus` and `native`. These can be enabled 
through `observability-modes` flag, allowing you to activate one or both of 
them.
@@ -294,7 +352,7 @@ scrape_configs:
 
 #### Grafana Dashboard
 
-Check out the [BanyanDB Cluster Dashboard](grafana-cluster.json) for 
monitoring BanyanDB metrics.
+Check out the [BanyanDB Cluster Dashboard](grafana-cluster.json) for 
monitoring BanyanDB metrics. The dashboard includes a **Liaison internal 
queue** row (`queue_sub` saturation / aborts and `queue_pub` send and error 
rates); metric names and PromQL examples are documented in the **Liaison 
internal queue** subsection above.
 
 ### Native
 

Reply via email to