This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 32c5c7d83 feat: queue_pub observability (#1113)
32c5c7d83 is described below
commit 32c5c7d83d59e43c001c4b9abc817e51c36ba2d3
Author: OmCheeLin <[email protected]>
AuthorDate: Mon May 11 15:02:20 2026 +0800
feat: queue_pub observability (#1113)
---
CHANGES.md | 1 +
banyand/metadata/client.go | 5 +
banyand/metadata/metadata.go | 1 +
banyand/metadata/service/server.go | 11 +
banyand/queue/pub/batch.go | 163 +++++++++---
banyand/queue/pub/metrics_test.go | 499 ++++++++++++++++++++++++++++++++++++
banyand/queue/pub/pub.go | 45 ++++
banyand/queue/pub/retry_test.go | 18 +-
docs/operation/grafana-cluster.json | 445 +++++++++++++++++++++++++++++++-
docs/operation/observability.md | 60 ++++-
10 files changed, 1200 insertions(+), 48 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index c714597ea..efa30e4b4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -51,6 +51,7 @@ Release Notes.
- Repair the `GetMaxRevision` aggregation on the per-node
`NodeSchemaStatusService` (`banyand/metadata/schema/property/node_status.go`).
The previous implementation returned `min(schemaCache.notifiedModRevision,
NodeRepoRegistry.LatestModRevision)`, but `LatestModRevision` aggregated
per-service `schemaRepo` watermarks via `min` — and each `schemaRepo` only
advances on events for its own catalog (`pkg/schema/init.go:72` filters by
`g.Catalog`), so the min was perpetually pinned to the [...]
- Fix the `notifiedModRevision` watermark advancement in
`SchemaRegistry.processInitialResourceFromProperty`, `handleWatchEvent` (DELETE
branch), and `handleDeletion`. Previously `AdvanceNotified` was gated on
`cache.Update` / `cache.Delete` returning true, but those methods compare
`latestUpdateAt` (property timestamp) while the watermark tracks `modRevision`
(etcd revision). When the property timestamp is stale (e.g. a no-op Update that
doesn't change the measure spec), the cache rej [...]
- Fix the `modRevision` contract on no-op Update RPCs
(`MeasureRegistryService.Update`, etc.). Previously `updateResource` detected
unchanged content via `CheckerMap` and short-circuited without writing to the
property store, but the caller had already fabricated `modRevision =
time.Now().UnixNano()` and returned it. The returned revision never appeared in
the property watch stream, so `AwaitRevisionApplied(R)` would hang.
`updateResource` now returns `(int64, error)` — the existing pr [...]
+ - Add end-to-end observability for liaison internal queue pipelines with
per-topic metrics for queue_sub and queue_pub, along with Grafana panels and
troubleshooting docs.
### Bug Fixes
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 6e66ea1ef..9a7b8bb10 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -421,6 +421,11 @@ func (s *clientService) SetMetricsRegistry(omr
observability.MetricsRegistry) {
s.omr = omr
}
+// MetricsRegistry returns the metrics registry set via SetMetricsRegistry.
+func (s *clientService) MetricsRegistry() observability.MetricsRegistry {
+ return s.omr
+}
+
func (s *clientService) Name() string {
return "metadata"
}
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index a6e8e6719..f26631964 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -73,6 +73,7 @@ type Service interface {
// Service.
NodeRepoRegistry() *registry.NodeRepoRegistry
SetMetricsRegistry(omr observability.MetricsRegistry)
+ MetricsRegistry() observability.MetricsRegistry
SetDataBroadcaster(broadcaster bus.Broadcaster)
SetLiaisonBroadcaster(broadcaster bus.Broadcaster)
RegisterDataCollector(catalog commonv1.Catalog, collector
schema.DataInfoCollector)
diff --git a/banyand/metadata/service/server.go
b/banyand/metadata/service/server.go
index 0d23f3050..30ed3f612 100644
--- a/banyand/metadata/service/server.go
+++ b/banyand/metadata/service/server.go
@@ -192,6 +192,17 @@ func (s *server) SetMetricsRegistry(omr
observability.MetricsRegistry) {
s.Service.SetMetricsRegistry(omr)
}
+// MetricsRegistry returns the metrics registry held by this server or the
embedded client.
+func (s *server) MetricsRegistry() observability.MetricsRegistry {
+ if s.omr != nil {
+ return s.omr
+ }
+ if s.Service != nil {
+ return s.Service.MetricsRegistry()
+ }
+ return nil
+}
+
// GetSchemaServerPort returns the schema server gRPC port.
func (s *server) GetSchemaServerPort() *uint32 {
if s.propServer != nil {
diff --git a/banyand/queue/pub/batch.go b/banyand/queue/pub/batch.go
index b755d300a..005b7633f 100644
--- a/banyand/queue/pub/batch.go
+++ b/banyand/queue/pub/batch.go
@@ -40,6 +40,17 @@ const (
defaultPerRequestTimeout = 2 * time.Second
defaultBackoffBase = 500 * time.Millisecond
defaultBackoffMax = 30 * time.Second
+
+ // Local send-side failures (before the frame leaves the publisher).
+ sendErrReasonNonTransient = "non_transient"
+ sendErrReasonCanceled = "canceled"
+ sendErrReasonStreamCanceled = "stream_canceled"
+ sendErrReasonRetryExhausted = "retry_exhausted"
+ // Remote-side failures (observed after the frame was written to the
stream).
+ sendErrReasonRecvError = "recv_error" // s.Recv returned an
error (connection/protocol layer).
+ sendErrReasonServerRejected = "server_rejected" // Server responded
with a non-empty Error (includes failover statuses).
+
+ sendResultSuccess = "success"
)
type writeStream struct {
@@ -85,13 +96,22 @@ func (bp *batchPublisher) Publish(ctx context.Context,
topic bus.Topic, messages
continue
}
+ topicStr := topic.String()
sendData := func() (success bool) {
if stream, ok := bp.streams[node]; ok {
+ hasMetrics := bp.hasMetrics()
defer func() {
if !success {
delete(bp.streams, node)
+ if hasMetrics {
+
bp.pub.metrics.inflightStreams.Add(-1, node)
+ }
}
}()
+ if hasMetrics {
+ bp.pub.metrics.inflightRequests.Add(1,
topicStr, node)
+ defer
bp.pub.metrics.inflightRequests.Add(-1, topicStr, node)
+ }
select {
case <-ctx.Done():
return false
@@ -99,7 +119,7 @@ func (bp *batchPublisher) Publish(ctx context.Context, topic
bus.Topic, messages
return false
default:
}
- errSend := bp.retrySend(ctx, stream.client, r,
node)
+ errSend := bp.retrySend(ctx, stream.client, r,
node, topicStr)
if errSend != nil {
err = multierr.Append(err,
fmt.Errorf("failed to send message to node %s: %w", node, errSend))
// Record failure for circuit breaker
(only for transient/internal errors)
@@ -155,6 +175,10 @@ func (bp *batchPublisher) Publish(ctx context.Context,
topic bus.Topic, messages
deferFn := cancel
stream, errCreateStream := nodeClient.client.Send(streamCtx)
if errCreateStream != nil {
+ // Release the timeout context when Send() fails;
otherwise listenBatchResponse,
+ // which normally invokes deferFn on exit, is never
spawned and streamCtx leaks
+ // until bp.timeout expires, accumulating timer
goroutines under repeated failures.
+ cancel()
err = multierr.Append(err, fmt.Errorf("failed to get
stream for node %s: %w", node, errCreateStream))
continue
}
@@ -162,48 +186,70 @@ func (bp *batchPublisher) Publish(ctx context.Context,
topic bus.Topic, messages
client: stream,
ctxDoneCh: streamCtx.Done(),
}
+ if bp.hasMetrics() {
+ bp.pub.metrics.inflightStreams.Add(1, node)
+ }
bp.f.events = append(bp.f.events, make(chan batchEvent))
_ = sendData()
nodeName := node
- go func(s clusterv1.Service_SendClient, deferFn func(), bc chan
batchEvent, curNode string) {
- defer func() {
- close(bc)
- deferFn()
- }()
- select {
- case <-ctx.Done():
- return
- default:
- }
- resp, errRecv := s.Recv()
- if errRecv != nil {
- if grpchelper.IsFailoverError(errRecv) {
- // Record circuit breaker failure
before creating failover event
- bp.pub.connMgr.RecordFailure(curNode,
errRecv)
- bc <- batchEvent{n: curNode, e:
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR,
errRecv.Error())}
- }
- return
- }
- if resp == nil {
- return
- }
- if resp.Error == "" {
- return
- }
- if isFailoverStatus(resp.Status) {
- ce := common.NewErrorWithStatus(resp.Status,
resp.Error)
- // Record circuit breaker failure before
creating failover event
- bp.pub.connMgr.RecordFailure(curNode, ce)
- bc <- batchEvent{n: curNode, e: ce}
- }
- }(stream, deferFn, bp.f.events[len(bp.f.events)-1], nodeName)
+ go bp.listenBatchResponse(ctx, stream, deferFn,
bp.f.events[len(bp.f.events)-1], nodeName, topicStr)
}
return nil, err
}
+func (bp *batchPublisher) hasMetrics() bool {
+ return bp.pub != nil && bp.pub.metrics != nil
+}
+
+// listenBatchResponse receives the server response and records failover
events and end-to-end failure metrics.
+func (bp *batchPublisher) listenBatchResponse(ctx context.Context, s
clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent, curNode,
topic string) {
+ defer func() {
+ close(bc)
+ deferFn()
+ }()
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ resp, errRecv := s.Recv()
+ if errRecv != nil {
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode,
sendErrReasonRecvError)
+ }
+ if grpchelper.IsFailoverError(errRecv) {
+ // Record circuit breaker failure before creating
failover event
+ bp.pub.connMgr.RecordFailure(curNode, errRecv)
+ bc <- batchEvent{n: curNode, e:
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR,
errRecv.Error())}
+ }
+ return
+ }
+ if resp == nil || resp.Error == "" || resp.Status ==
modelv1.Status_STATUS_SUCCEED {
+ return
+ }
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode,
sendErrReasonServerRejected)
+ }
+ ce := common.NewErrorWithStatus(resp.Status, resp.Error)
+ // Only failover statuses trigger circuit-breaker accounting; other
server-side
+ // rejections (e.g. invalid argument) are surfaced to the caller but do
not count
+ // toward node health.
+ if isFailoverStatus(resp.Status) {
+ bp.pub.connMgr.RecordFailure(curNode, ce)
+ }
+ // Always surface a batchEvent for any non-empty resp.Error so that
Close() exposes
+ // the rejection to the caller. Otherwise the
send_err_total{reason="server_rejected"}
+ // counter would rise on dashboards while callers silently treat the
batch as successful.
+ bc <- batchEvent{n: curNode, e: ce}
+}
+
func (bp *batchPublisher) Close() (cee map[string]*common.Error, err error) {
- for i := range bp.streams {
- err = multierr.Append(err, bp.streams[i].client.CloseSend())
+ for nodeName, stream := range bp.streams {
+ err = multierr.Append(err, stream.client.CloseSend())
+ if bp.hasMetrics() {
+ bp.pub.metrics.inflightStreams.Add(-1, nodeName)
+ }
}
for i := range bp.streams {
<-bp.streams[i].ctxDoneCh
@@ -280,8 +326,14 @@ func isFailoverStatus(s modelv1.Status) bool {
}
// retrySend implements bounded retries for client streaming sends with
exponential backoff and jitter.
-func (bp *batchPublisher) retrySend(ctx context.Context, stream
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string) error {
+func (bp *batchPublisher) retrySend(ctx context.Context, stream
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string, topic
string) error {
var lastErr error
+ start := time.Now()
+ observeDuration := func(result string) {
+ if bp.hasMetrics() {
+
bp.pub.metrics.sendDurationSeconds.Observe(time.Since(start).Seconds(), topic,
node, result)
+ }
+ }
for attempt := 0; attempt <= defaultMaxRetries; attempt++ {
// Create per-attempt timeout context
@@ -291,9 +343,17 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
select {
case <-ctx.Done():
cancel()
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonCanceled)
+ }
+ observeDuration(sendErrReasonCanceled)
return ctx.Err()
case <-stream.Context().Done():
cancel()
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonStreamCanceled)
+ }
+ observeDuration(sendErrReasonStreamCanceled)
return stream.Context().Err()
case <-attemptCtx.Done():
cancel()
@@ -310,7 +370,12 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
cancel()
if sendErr == nil {
- // Success
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendSuccessTotal.Inc(1, topic,
node)
+
bp.pub.metrics.sendBytesTotal.Inc(float64(len(r.Body)), topic, node)
+ }
+ // Success writing to the local stream; end-to-end ack
is observed in listenBatchResponse.
+ observeDuration(sendResultSuccess)
return nil
}
@@ -319,9 +384,17 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
// Check if error is retryable
if !grpchelper.IsTransientError(sendErr) {
// Non-transient error, don't retry
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonNonTransient)
+ }
+ observeDuration(sendErrReasonNonTransient)
return sendErr
}
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendRetryAttempts.Inc(1, topic, node)
+ }
+
// If this was the last attempt, don't sleep
if attempt >= defaultMaxRetries {
break
@@ -334,13 +407,29 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
select {
case <-time.After(backoff):
// Continue to next attempt
+ if bp.hasMetrics() {
+
bp.pub.metrics.sendBackoffSeconds.Inc(backoff.Seconds(), topic, node)
+ }
case <-ctx.Done():
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonCanceled)
+ }
+ observeDuration(sendErrReasonCanceled)
return ctx.Err()
case <-stream.Context().Done():
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonStreamCanceled)
+ }
+ observeDuration(sendErrReasonStreamCanceled)
return stream.Context().Err()
}
}
// All retries exhausted
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendRetryExhausted.Inc(1, topic, node)
+ bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonRetryExhausted)
+ }
+ observeDuration(sendErrReasonRetryExhausted)
return fmt.Errorf("retry exhausted for node %s after %d attempts, last
error: %w", node, defaultMaxRetries+1, lastErr)
}
diff --git a/banyand/queue/pub/metrics_test.go
b/banyand/queue/pub/metrics_test.go
new file mode 100644
index 000000000..b326bf88a
--- /dev/null
+++ b/banyand/queue/pub/metrics_test.go
@@ -0,0 +1,499 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+ "context"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+type fakeSendClient struct {
+ clusterv1.Service_SendClient
+ ctx context.Context
+
+ sendErrs []error
+ sendIdx int
+}
+
+func (f *fakeSendClient) Send(_ *clusterv1.SendRequest) error {
+ if f.sendIdx >= len(f.sendErrs) {
+ return nil
+ }
+ err := f.sendErrs[f.sendIdx]
+ f.sendIdx++
+ return err
+}
+
+func (f *fakeSendClient) Context() context.Context {
+ return f.ctx
+}
+
+type countingCounter struct {
+ count float64
+}
+
+func (c *countingCounter) Inc(delta float64, _ ...string) {
+ c.count += delta
+}
+
+func (*countingCounter) Delete(_ ...string) bool {
+ return true
+}
+
+// errReasonCapturer records send_err_total increments keyed by the `reason`
label (third label).
+type errReasonCapturer struct {
+ byReason map[string]float64
+ mu sync.Mutex
+}
+
+func newErrReasonCapturer() *errReasonCapturer {
+ return &errReasonCapturer{byReason: make(map[string]float64)}
+}
+
+func (c *errReasonCapturer) Inc(delta float64, labels ...string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if len(labels) < 3 {
+ return
+ }
+ reason := labels[2]
+ c.byReason[reason] += delta
+}
+
+func (c *errReasonCapturer) Delete(_ ...string) bool {
+ return true
+}
+
+func (c *errReasonCapturer) sum(reason string) float64 {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ return c.byReason[reason]
+}
+
+type noopGauge struct{}
+
+func (*noopGauge) Set(_ float64, _ ...string) {}
+func (*noopGauge) Add(_ float64, _ ...string) {}
+func (*noopGauge) Delete(_ ...string) bool { return true }
+
+type valGauge struct {
+ mu sync.Mutex
+ val float64
+}
+
+func (g *valGauge) Add(delta float64, _ ...string) {
+ g.mu.Lock()
+ g.val += delta
+ g.mu.Unlock()
+}
+
+func (g *valGauge) Set(v float64, _ ...string) {
+ g.mu.Lock()
+ g.val = v
+ g.mu.Unlock()
+}
+
+func (*valGauge) Delete(_ ...string) bool { return true }
+
+func (g *valGauge) value() float64 {
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ return g.val
+}
+
+// inflightKeyedGauge tracks inflight_requests Add deltas per (topic, node)
label pair.
+type inflightKeyedGauge struct {
+ vals map[string]float64
+ mu sync.Mutex
+}
+
+func newInflightKeyedGauge() *inflightKeyedGauge {
+ return &inflightKeyedGauge{vals: make(map[string]float64)}
+}
+
+func inflightReqKey(topic, node string) string {
+ return strings.Join([]string{topic, node}, "|")
+}
+
+func (g *inflightKeyedGauge) Add(delta float64, labels ...string) {
+ if len(labels) < 2 {
+ return
+ }
+ k := inflightReqKey(labels[0], labels[1])
+ g.mu.Lock()
+ g.vals[k] += delta
+ g.mu.Unlock()
+}
+
+func (*inflightKeyedGauge) Set(_ float64, _ ...string) {}
+
+func (*inflightKeyedGauge) Delete(_ ...string) bool {
+ return true
+}
+
+func (g *inflightKeyedGauge) net(topic, node string) float64 {
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ return g.vals[inflightReqKey(topic, node)]
+}
+
+type noopHistogram struct{}
+
+func (*noopHistogram) Observe(_ float64, _ ...string) {}
+func (*noopHistogram) Delete(_ ...string) bool { return true }
+
+func newPubMetricsWithErrCapture(sendErr *errReasonCapturer) *pubMetrics {
+ return &pubMetrics{
+ sendSuccessTotal: &countingCounter{},
+ sendErrTotal: sendErr,
+ sendBytesTotal: &countingCounter{},
+ sendDurationSeconds: &noopHistogram{},
+ sendRetryAttempts: &countingCounter{},
+ sendRetryExhausted: &countingCounter{},
+ sendBackoffSeconds: &countingCounter{},
+ inflightStreams: &noopGauge{},
+ inflightRequests: &noopGauge{},
+ }
+}
+
+func newPubWithConnMgrForMetrics(t *testing.T, pm *pubMetrics) *pub {
+ t.Helper()
+ p := &pub{
+ handlers: make(map[bus.Topic]schema.EventHandler),
+ log: logger.GetLogger("queue-pub-metrics-test"),
+ metrics: pm,
+ closer: run.NewCloser(1),
+ }
+ p.connMgr =
grpchelper.NewConnManager(grpchelper.ConnManagerConfig[*client]{
+ Handler: p,
+ Logger: p.log,
+ RetryPolicy: "",
+ MaxRecvMsgSize: 4 << 20,
+ })
+ return p
+}
+
+// TestRetryMetrics ensures that retry-related metrics are updated when
retrySend
+// observes retryable errors and eventually exhausts retries.
+func TestRetryMetrics(t *testing.T) {
+ sendErrCap := newErrReasonCapturer()
+ p := &pub{
+ metrics: &pubMetrics{
+ sendRetryAttempts: &countingCounter{},
+ sendRetryExhausted: &countingCounter{},
+ sendErrTotal: sendErrCap,
+ sendBackoffSeconds: &countingCounter{},
+ sendSuccessTotal: &countingCounter{},
+ sendBytesTotal: &countingCounter{},
+ sendDurationSeconds: &noopHistogram{},
+ inflightStreams: &noopGauge{},
+ inflightRequests: &noopGauge{},
+ },
+ }
+
+ bp := &batchPublisher{
+ pub: p,
+ }
+
+ ctx := context.Background()
+ client := &fakeSendClient{
+ sendErrs: []error{
+ status.Error(codes.Unavailable, "transient"),
+ status.Error(codes.Unavailable, "transient"),
+ status.Error(codes.Unavailable, "transient"),
+ status.Error(codes.Unavailable, "transient"),
+ },
+ ctx: ctx,
+ }
+
+ req := &clusterv1.SendRequest{
+ Topic: "test-topic",
+ Body: []byte("payload"),
+ }
+
+ const nodeName = "test-node"
+ const topicStr = "test-topic"
+
+ retryErr := bp.retrySend(ctx, client, req, nodeName, topicStr)
+ require.Error(t, retryErr)
+
+ retries := p.metrics.sendRetryAttempts.(*countingCounter).count
+ require.Equal(t, float64(4), retries, "one retry counter per transient
failure before exhaustion")
+
+ exhausted := p.metrics.sendRetryExhausted.(*countingCounter).count
+ require.Equal(t, float64(1), exhausted)
+
+ require.Equal(t, float64(1),
sendErrCap.sum(sendErrReasonRetryExhausted))
+ require.Equal(t, float64(0), sendErrCap.sum(sendErrReasonNonTransient))
+
+ backoff := p.metrics.sendBackoffSeconds.(*countingCounter).count
+ require.Greater(t, backoff, float64(0), "backoff should be recorded
between retry attempts")
+}
+
+func TestRetrySendNonTransientRecordsReason(t *testing.T) {
+ sendErrCap := newErrReasonCapturer()
+ p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
+ bp := &batchPublisher{pub: p}
+
+ ctx := context.Background()
+ mockStream := NewMockSendClient(ctx)
+ mockStream.SetSendFunc(func(*clusterv1.SendRequest) error {
+ return status.Error(codes.InvalidArgument, "non-transient")
+ })
+
+ err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1",
"t1")
+ require.Error(t, err)
+
+ require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonNonTransient))
+ require.Equal(t, float64(0),
sendErrCap.sum(sendErrReasonRetryExhausted))
+ require.Equal(t, float64(0),
p.metrics.sendRetryAttempts.(*countingCounter).count)
+}
+
+func TestRetrySendCanceledRecordsReason(t *testing.T) {
+ sendErrCap := newErrReasonCapturer()
+ p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
+ bp := &batchPublisher{pub: p}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ mockStream := NewMockSendClient(context.Background())
+ mockStream.SetSendFunc(func(*clusterv1.SendRequest) error {
+ return status.Error(codes.Unavailable, "unavailable")
+ })
+
+ err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1",
"t1")
+ require.ErrorIs(t, err, context.Canceled)
+
+ require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonCanceled))
+}
+
+func TestRetrySendStreamCanceledRecordsReason(t *testing.T) {
+ sendErrCap := newErrReasonCapturer()
+ p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
+ bp := &batchPublisher{pub: p}
+
+ streamCtx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ mockStream := NewMockSendClient(streamCtx)
+
+ err := bp.retrySend(context.Background(), mockStream,
&clusterv1.SendRequest{}, "n1", "t1")
+ require.ErrorIs(t, err, context.Canceled)
+
+ require.Equal(t, float64(1),
sendErrCap.sum(sendErrReasonStreamCanceled))
+}
+
+func TestListenBatchResponseRecordsRecvError(t *testing.T) {
+ sendErrCap := newErrReasonCapturer()
+ pm := newPubMetricsWithErrCapture(sendErrCap)
+ p := newPubWithConnMgrForMetrics(t, pm)
+ bp := &batchPublisher{pub: p}
+
+ ctx := context.Background()
+ mockStream := NewMockSendClient(ctx)
+ mockStream.SetRecvFunc(func() (*clusterv1.SendResponse, error) {
+ return nil, status.Error(codes.Unavailable, "recv failed")
+ })
+
+ bc := make(chan batchEvent, 1)
+ bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a",
"topic-a")
+
+ require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonRecvError))
+}
+
+func TestListenBatchResponseRecvNonFailoverStillRecordsRecvError(t *testing.T)
{
+ sendErrCap := newErrReasonCapturer()
+ pm := newPubMetricsWithErrCapture(sendErrCap)
+ p := newPubWithConnMgrForMetrics(t, pm)
+ bp := &batchPublisher{pub: p}
+
+ ctx := context.Background()
+ mockStream := NewMockSendClient(ctx)
+ mockStream.SetRecvFunc(func() (*clusterv1.SendResponse, error) {
+ return nil, status.Error(codes.InvalidArgument, "bad")
+ })
+
+ bc := make(chan batchEvent, 1)
+ bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a",
"topic-a")
+
+ require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonRecvError))
+}
+
+func TestListenBatchResponseServerRejectedWithoutFailover(t *testing.T) {
+ sendErrCap := newErrReasonCapturer()
+ pm := newPubMetricsWithErrCapture(sendErrCap)
+ p := newPubWithConnMgrForMetrics(t, pm)
+ bp := &batchPublisher{pub: p}
+
+ ctx := context.Background()
+ mockStream := NewMockSendClient(ctx)
+ mockStream.SetRecvFunc(func() (*clusterv1.SendResponse, error) {
+ return &clusterv1.SendResponse{
+ Error: "rejected",
+ Status: modelv1.Status_STATUS_INTERNAL_ERROR,
+ }, nil
+ })
+
+ bc := make(chan batchEvent, 1)
+ bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a",
"topic-a")
+
+ require.Equal(t, float64(1),
sendErrCap.sum(sendErrReasonServerRejected))
+ require.Equal(t, float64(0), sendErrCap.sum(sendErrReasonRecvError))
+
+ // Non-failover server rejections are now surfaced to the caller via
batchEvent
+ // so that send_err_total{reason="server_rejected"} stays consistent
with visible errors.
+ // The event must carry the rejection but must NOT trigger a
circuit-breaker failover.
+ select {
+ case evt, ok := <-bc:
+ require.True(t, ok, "expected a batchEvent for non-failover
server rejection")
+ require.Equal(t, "node-a", evt.n)
+ require.NotNil(t, evt.e)
+ require.Equal(t, modelv1.Status_STATUS_INTERNAL_ERROR,
evt.e.Status())
+ default:
+ t.Fatal("expected batchEvent for server_rejected but channel
was empty")
+ }
+}
+
+func TestListenBatchResponseDiskFullSendsFailoverEvent(t *testing.T) {
+ sendErrCap := newErrReasonCapturer()
+ pm := newPubMetricsWithErrCapture(sendErrCap)
+ p := newPubWithConnMgrForMetrics(t, pm)
+ bp := &batchPublisher{pub: p}
+
+ ctx := context.Background()
+ mockStream := NewMockSendClient(ctx)
+ mockStream.SetRecvFunc(func() (*clusterv1.SendResponse, error) {
+ return &clusterv1.SendResponse{
+ Error: "disk full",
+ Status: modelv1.Status_STATUS_DISK_FULL,
+ }, nil
+ })
+
+ bc := make(chan batchEvent, 1)
+ bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a",
"topic-a")
+
+ require.Equal(t, float64(1),
sendErrCap.sum(sendErrReasonServerRejected))
+
+ select {
+ case evt := <-bc:
+ require.Equal(t, "node-a", evt.n)
+ require.NotNil(t, evt.e)
+ require.Equal(t, modelv1.Status_STATUS_DISK_FULL,
evt.e.Status())
+ default:
+ t.Fatal("expected failover batchEvent on disk full response")
+ }
+}
+
+func TestCloseDecrementsInflightStreams(t *testing.T) {
+ streamGauge := &valGauge{}
+ pm := &pubMetrics{
+ sendSuccessTotal: &countingCounter{},
+ sendErrTotal: newErrReasonCapturer(),
+ sendBytesTotal: &countingCounter{},
+ sendDurationSeconds: &noopHistogram{},
+ sendRetryAttempts: &countingCounter{},
+ sendRetryExhausted: &countingCounter{},
+ sendBackoffSeconds: &countingCounter{},
+ inflightStreams: streamGauge,
+ inflightRequests: &noopGauge{},
+ }
+ p := newPubWithConnMgrForMetrics(t, pm)
+
+ ctx := context.Background()
+ mockStream := NewMockSendClient(ctx)
+ doneCh := make(chan struct{})
+ close(doneCh)
+
+ bp := p.NewBatchPublisher(time.Second).(*batchPublisher)
+ bp.streams["node-x"] = writeStream{
+ client: mockStream,
+ ctxDoneCh: doneCh,
+ }
+
+ streamGauge.Add(1, "node-x")
+
+ _, closeErr := bp.Close()
+ require.NoError(t, closeErr)
+ require.Equal(t, float64(0), streamGauge.value(), "Close should
decrement inflight_streams once per open stream")
+}
+
+// TestPublishInflightRequestsBalancedWhenStreamPreexists checks that Publish
increments then decrements
+// inflight_requests (via defer) when reusing an existing stream, without
requiring GetClient / new stream setup.
+func TestPublishInflightRequestsBalancedWhenStreamPreexists(t *testing.T) {
+ inflightReq := newInflightKeyedGauge()
+ sendErrCap := newErrReasonCapturer()
+ attempts := &countingCounter{}
+ pm := &pubMetrics{
+ sendSuccessTotal: attempts,
+ sendErrTotal: sendErrCap,
+ sendBytesTotal: &countingCounter{},
+ sendDurationSeconds: &noopHistogram{},
+ sendRetryAttempts: &countingCounter{},
+ sendRetryExhausted: &countingCounter{},
+ sendBackoffSeconds: &countingCounter{},
+ inflightStreams: &noopGauge{},
+ inflightRequests: inflightReq,
+ }
+ p := newPubWithConnMgrForMetrics(t, pm)
+
+ ctx := context.Background()
+ mockStream := NewMockSendClient(ctx)
+ mockStream.SetSendFunc(func(*clusterv1.SendRequest) error {
+ return nil
+ })
+
+ doneCh := make(chan struct{})
+ close(doneCh)
+
+ const nodeName = "node-a"
+ topic := data.TopicMeasureWrite
+ topicStr := topic.String()
+
+ bp := p.NewBatchPublisher(10 * time.Second).(*batchPublisher)
+ bp.streams[nodeName] = writeStream{
+ client: mockStream,
+ ctxDoneCh: doneCh,
+ }
+
+ msg := bus.NewMessageWithNode(1, nodeName, []byte("payload"))
+
+ _, publishErr := bp.Publish(ctx, topic, msg)
+ require.NoError(t, publishErr)
+
+ require.Equal(t, float64(1), attempts.count, "successful retrySend
should record send_success_total")
+ require.Equal(t, float64(0), inflightReq.net(topicStr, nodeName),
+ "inflight_requests defer must balance +1/-1 for topic/node")
+ require.Equal(t, float64(0),
sendErrCap.sum(sendErrReasonRetryExhausted))
+}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 1217009b4..d552bce77 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -42,14 +42,18 @@ import (
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
)
+var queuePubScope = observability.RootScope.SubScope("queue_pub")
+
// ChunkedSyncClientConfig configures chunked sync client behavior.
type ChunkedSyncClientConfig struct {
ChunkSize uint32 // Size of each chunk in bytes
@@ -71,6 +75,7 @@ type pub struct {
metadata metadata.Repo
handlers map[bus.Topic]schema.EventHandler
log *logger.Logger
+ metrics *pubMetrics
connMgr *grpchelper.ConnManager[*client]
closer *run.Closer
writableProbe map[string]map[string]struct{}
@@ -83,6 +88,34 @@ type pub struct {
tlsEnabled bool
}
+type pubMetrics struct {
+ sendSuccessTotal meter.Counter
+ sendErrTotal meter.Counter
+ sendBytesTotal meter.Counter
+ sendDurationSeconds meter.Histogram
+
+ sendRetryAttempts meter.Counter
+ sendRetryExhausted meter.Counter
+ sendBackoffSeconds meter.Counter
+ inflightStreams meter.Gauge
+ inflightRequests meter.Gauge
+}
+
+func newPubMetrics(factory observability.Factory) *pubMetrics {
+ return &pubMetrics{
+ sendSuccessTotal: factory.NewCounter("send_success_total",
"topic", "node"),
+ sendErrTotal: factory.NewCounter("send_err_total",
"topic", "node", "reason"),
+ sendBytesTotal: factory.NewCounter("send_bytes_total",
"topic", "node"),
+ sendDurationSeconds:
factory.NewHistogram("send_duration_seconds", meter.DefBuckets, "topic",
"node", "result"),
+
+ sendRetryAttempts:
factory.NewCounter("send_retry_attempts_total", "topic", "node"),
+ sendRetryExhausted:
factory.NewCounter("send_retry_exhausted_total", "topic", "node"),
+ sendBackoffSeconds:
factory.NewCounter("send_backoff_seconds_total", "topic", "node"),
+ inflightStreams: factory.NewGauge("inflight_streams",
"node"),
+ inflightRequests: factory.NewGauge("inflight_requests",
"topic", "node"),
+ }
+}
+
// AddressOf implements grpchelper.ConnectionHandler.
func (p *pub) AddressOf(node *databasev1.Node) string {
return node.GrpcAddress
@@ -387,6 +420,18 @@ func (p *pub) PreRun(context.Context) error {
p.log = logger.GetLogger("server-queue-pub-" + p.prefix)
+ if p.metrics == nil && p.metadata != nil {
+ if svc, ok := p.metadata.(metadata.Service); ok {
+ if omr := svc.MetricsRegistry(); omr != nil {
+ p.metrics =
newPubMetrics(omr.With(queuePubScope))
+ } else {
+ p.log.Warn().Msg("queue_pub metrics disabled:
MetricsRegistry returned nil")
+ }
+ } else {
+ p.log.Warn().Msg("queue_pub metrics disabled: metadata
does not implement metadata.Service")
+ }
+ }
+
// Initialize connection manager with the pub as the handler
p.connMgr =
grpchelper.NewConnManager(grpchelper.ConnManagerConfig[*client]{
//nolint:contextcheck // health check runs in background goroutine
Handler: p,
diff --git a/banyand/queue/pub/retry_test.go b/banyand/queue/pub/retry_test.go
index b073c3470..8293f90ee 100644
--- a/banyand/queue/pub/retry_test.go
+++ b/banyand/queue/pub/retry_test.go
@@ -109,7 +109,7 @@ func TestRetrySendSuccess(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node")
+ err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
assert.NoError(t, err, "successful send should not return error")
}
@@ -131,7 +131,7 @@ func TestRetrySendTransientErrorWithRecovery(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node")
+ err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
assert.NoError(t, err, "should succeed after retry")
}
@@ -149,7 +149,7 @@ func TestRetrySendNonTransientError(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node")
+ err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
assert.Error(t, err, "non-transient error should be returned
immediately")
assert.Equal(t, nonTransientErr, err, "should return the original
error")
@@ -168,7 +168,7 @@ func TestRetrySendExhaustedRetries(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node")
+ err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
assert.Error(t, err, "should return error after exhausting retries")
assert.Contains(t, err.Error(), "retry exhausted", "error should
indicate retry exhaustion")
@@ -190,7 +190,7 @@ func TestRetrySendContextCancellation(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node")
+ err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
assert.Error(t, err, "should return error when context is canceled")
assert.Equal(t, context.Canceled, err, "should return context
cancellation error")
@@ -208,7 +208,7 @@ func TestRetrySendStreamContextDone(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node")
+ err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
assert.Error(t, err, "should return error when stream context is done")
assert.Equal(t, context.Canceled, err, "should return stream context
cancellation error")
@@ -228,7 +228,7 @@ func TestRetrySendPerAttemptTimeout(t *testing.T) {
req := &clusterv1.SendRequest{}
start := time.Now()
- _ = bp.retrySend(ctx, mockStream, req, "test-node")
+ _ = bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
duration := time.Since(start)
// Should timeout quickly due to per-attempt timeout, not wait for the
full operation
@@ -259,7 +259,7 @@ func TestRetrySendBackoffTiming(t *testing.T) {
req := &clusterv1.SendRequest{}
start := time.Now()
- err := bp.retrySend(ctx, mockStream, req, "test-node")
+ err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
duration := time.Since(start)
assert.NoError(t, err, "should eventually succeed")
@@ -302,7 +302,7 @@ func TestRetrySendConcurrency(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req,
fmt.Sprintf("test-node-%d", id))
+ err := bp.retrySend(ctx, mockStream, req,
fmt.Sprintf("test-node-%d", id), "test-topic")
errors <- err
}(i)
}
diff --git a/docs/operation/grafana-cluster.json
b/docs/operation/grafana-cluster.json
index 6c2df9309..e9b58581a 100644
--- a/docs/operation/grafana-cluster.json
+++ b/docs/operation/grafana-cluster.json
@@ -2524,6 +2524,449 @@
],
"title": "Total Documents",
"type": "timeseries"
+ },
+ {
+ "collapsed": false,
+ "gridPos": {
+ "h": 1,
+ "w": 24,
+ "x": 0,
+ "y": 64
+ },
+ "id": 30,
+ "panels": [],
+ "title": "Liaison internal queue ($pod)",
+ "type": "row"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "Internal queue_sub chunked sync pressure: active
sessions and reorder-buffer depth (labels use topic, not session).",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "barWidthFactor": 0.6,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ }
+ ]
+ },
+ "unit": "short"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 65
+ },
+ "id": 31,
+ "options": {
+ "legend": {
+ "calcs": [
+ "lastNotNull",
+ "mean",
+ "max"
+ ],
+ "displayMode": "table",
+ "placement": "bottom",
+ "showLegend": true,
+ "sortBy": "Last *",
+ "sortDesc": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr":
"sum(banyandb_queue_sub_chunked_sync_active_sessions{job=~\"$job\",pod=~\"$pod\"})
by (topic)",
+ "hide": false,
+ "instant": false,
+ "legendFormat": "active_sessions {{topic}}",
+ "range": true,
+ "refId": "A"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr":
"sum(banyandb_queue_sub_chunk_reorder_buffered_chunks{job=~\"$job\",pod=~\"$pod\"})
by (topic)",
+ "hide": false,
+ "instant": false,
+ "legendFormat": "reorder_buffered {{topic}}",
+ "range": true,
+ "refId": "B"
+ }
+ ],
+ "title": "queue_sub: sync sessions & reorder buffer",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "Chunked sync abort rate by topic and reason (switch,
stream_error, ctx_done, eof).",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "barWidthFactor": 0.6,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ }
+ ]
+ },
+ "unit": "ops"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 65
+ },
+ "id": 32,
+ "options": {
+ "legend": {
+ "calcs": [
+ "lastNotNull",
+ "mean",
+ "max"
+ ],
+ "displayMode": "table",
+ "placement": "bottom",
+ "showLegend": true,
+ "sortBy": "Last *",
+ "sortDesc": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr":
"sum(rate(banyandb_queue_sub_chunked_sync_aborted_total{job=~\"$job\",pod=~\"$pod\"}[$__rate_interval]))
by (topic,reason)",
+ "hide": false,
+ "instant": false,
+ "legendFormat": "{{topic}} {{reason}}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "queue_sub: chunked sync abort rate",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "queue_pub successful stream sends per second
(tier-1/tier-2 batch client).",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "barWidthFactor": 0.6,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ }
+ ]
+ },
+ "unit": "ops"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 73
+ },
+ "id": 33,
+ "options": {
+ "legend": {
+ "calcs": [
+ "lastNotNull",
+ "mean",
+ "max"
+ ],
+ "displayMode": "table",
+ "placement": "bottom",
+ "showLegend": true,
+ "sortBy": "Last *",
+ "sortDesc": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr":
"sum(rate(banyandb_queue_pub_send_success_total{job=~\"$job\",pod=~\"$pod\"}[$__rate_interval]))
by (topic)",
+ "hide": false,
+ "instant": false,
+ "legendFormat": "{{topic}}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "queue_pub: send success rate",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "queue_pub errors and exhausted retries (see
docs/operation/observability.md for reason values).",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "barWidthFactor": 0.6,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ }
+ ]
+ },
+ "unit": "ops"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 73
+ },
+ "id": 34,
+ "options": {
+ "legend": {
+ "calcs": [
+ "lastNotNull",
+ "mean",
+ "max"
+ ],
+ "displayMode": "table",
+ "placement": "bottom",
+ "showLegend": true,
+ "sortBy": "Last *",
+ "sortDesc": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr":
"sum(rate(banyandb_queue_pub_send_err_total{job=~\"$job\",pod=~\"$pod\"}[$__rate_interval]))
by (reason)",
+ "hide": false,
+ "instant": false,
+ "legendFormat": "err {{reason}}",
+ "range": true,
+ "refId": "A"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr":
"sum(rate(banyandb_queue_pub_send_retry_exhausted_total{job=~\"$job\",pod=~\"$pod\"}[$__rate_interval]))
by (topic,node)",
+ "hide": false,
+ "instant": false,
+ "legendFormat": "retry_exhausted {{topic}} {{node}}",
+ "range": true,
+ "refId": "B"
+ }
+ ],
+ "title": "queue_pub: errors & retry exhausted",
+ "type": "timeseries"
}
],
"schemaVersion": 39,
@@ -2587,6 +3030,6 @@
"timezone": "browser",
"title": "BanyanDB Cluster",
"uid": "ddy81kbj931mof",
- "version": 19,
+ "version": 20,
"weekStart": ""
}
\ No newline at end of file
diff --git a/docs/operation/observability.md b/docs/operation/observability.md
index a8935c748..559ea053d 100644
--- a/docs/operation/observability.md
+++ b/docs/operation/observability.md
@@ -258,6 +258,64 @@ If the value is too large, it may indicate that too many
data points are being i
**Expression**:
`sum(banyandb_stream_tst_inverted_index_total_doc_count{job=~\"$job\",instance=~\"$instance\"})
by (group)`
+### Liaison internal queue (`queue_sub` / `queue_pub`)
+
+Liaison nodes run an internal gRPC **queue server** (`server-queue-sub`, wired
via `sub.NewServerWithPorts` in `pkg/cmdsetup/liaison.go`) and **queue
clients** (`server-queue-pub`) for tier-1/tier-2 pipelines. Prometheus metrics
use the namespaces `banyandb_queue_sub_*` and `banyandb_queue_pub_*` (built
from `observability.RootScope` + `queue_sub` / `queue_pub` sub-scopes). Data
nodes may expose the same metric families where the corresponding services run.
+
+#### `queue_sub` — inbound server (including chunked sync)
+
+| Metric (suffix after `banyandb_queue_sub_`) | Type | Labels | Meaning |
+| --- | --- | --- | --- |
+| `total_started`, `total_finished`, `total_err`, `total_latency` | Counter |
`topic` | Legacy per-topic stream handler lifecycle. |
+| `total_msg_received`, `total_msg_received_err`, `total_msg_sent`,
`total_msg_sent_err` | Counter | `topic` | Per-topic message I/O errors
(included in high-level error rates below). |
+| `out_of_order_chunks_received`, `chunks_buffered` | Counter | `topic` |
Chunk reordering: out-of-order arrivals and buffer events (**`topic` only**,
not per session). |
+| `buffer_timeouts`, `large_gaps_rejected`, `buffer_capacity_exceeded`,
`finish_sync_err` | Counter | `topic` | Reorder buffer pressure and sync
completion issues. |
+| `chunked_sync_active_sessions` | Gauge | `topic` | In-flight chunked sync
sessions per topic. |
+| `chunk_reorder_buffered_chunks` | Gauge | `topic` | Chunks waiting in the
reorder buffer. |
+| `chunked_sync_aborted_total` | Counter | `topic`, `reason` | Aborted
sessions; `reason` is one of `switch`, `stream_error`, `ctx_done`, `eof`. |
+| `chunked_sync_failed_parts_total` | Counter | `topic` | Parts incomplete
when a sync completes. |
+| `chunked_sync_total_bytes_received` | Counter | `topic` | Bytes received for
completed syncs. |
+| `chunked_sync_duration_seconds` | Histogram | `topic` | Wall-clock duration
of completed syncs. |
+
+**Troubleshooting:** rising `chunk_reorder_buffered_chunks` or
`buffer_timeouts` suggests sustained out-of-order or slow consumers. Spikes in
`chunked_sync_aborted_total` with `reason=switch` often correlate with
topic/hand-off changes; `stream_error` / `ctx_done` / `eof` point to RPC
lifecycle issues. Use `chunked_sync_failed_parts_total` and the duration
histogram to separate partial completion from healthy throughput.
+
+#### `queue_pub` — outbound batch client
+
+| Metric (suffix after `banyandb_queue_pub_`) | Type | Labels | Meaning |
+| --- | --- | --- | --- |
+| `send_success_total` | Counter | `topic`, `node` | Successful `Send` on the
client stream (local write, not end-to-end ack). |
+| `send_bytes_total` | Counter | `topic`, `node` | Payload bytes on successful
`Send`. |
+| `send_duration_seconds` | Histogram | `topic`, `node`, `result` | Time spent
in the send path including retries. `result` is one of `success`,
`non_transient`, `canceled`, `stream_canceled`, `retry_exhausted`; filter to
`result="success"` (and optionally `retry_exhausted`) when isolating end-to-end
send latency. |
+| `send_err_total` | Counter | `topic`, `node`, `reason` | Send/recv side
errors; `reason` includes `non_transient`, `canceled`, `stream_canceled`,
`retry_exhausted`, `recv_error`, `server_rejected`. |
+| `send_retry_attempts_total`, `send_retry_exhausted_total`,
`send_backoff_seconds_total` | Counter | `topic`, `node` | Retry/backoff
behavior before giving up. |
+| `inflight_streams` | Gauge | `node` | Open send streams per downstream node.
|
+| `inflight_requests` | Gauge | `topic`, `node` | In-flight batch send
operations. |
+
+**Troubleshooting:** correlate `send_retry_exhausted_total` and
`send_err_total{reason="retry_exhausted"}` with upstream pressure. `recv_error`
vs `server_rejected` separates transport failures from application-level
`SendResponse` errors. Sustained high `inflight_requests` or `inflight_streams`
may indicate slow or unavailable data nodes.
+
+Metrics are only registered when `metadata` implements `metadata.Service` and
`MetricsRegistry()` is non-nil (e.g. after `SetMetricsRegistry` in bootstrap).
`NewWithoutMetadata()` leaves `queue_pub` metrics disabled without a warning.
+
+#### Example PromQL snippets
+
+Saturation (liaison pod / job variables as in the cluster dashboard):
+
+- **Chunked sync sessions:**
`sum(banyandb_queue_sub_chunked_sync_active_sessions{job=~"$job",pod=~"$pod"})
by (topic)`
+- **Reorder buffer depth:**
`sum(banyandb_queue_sub_chunk_reorder_buffered_chunks{job=~"$job",pod=~"$pod"})
by (topic)`
+- **Chunked sync abort rate:**
`sum(rate(banyandb_queue_sub_chunked_sync_aborted_total{job=~"$job",pod=~"$pod"}[$__rate_interval]))
by (topic,reason)`
+- **Publisher success rate:**
`sum(rate(banyandb_queue_pub_send_success_total{job=~"$job",pod=~"$pod"}[$__rate_interval]))
by (topic)`
+- **Publisher errors by reason:**
`sum(rate(banyandb_queue_pub_send_err_total{job=~"$job",pod=~"$pod"}[$__rate_interval]))
by (reason)`
+
+**Suggested alerts (tune thresholds per cluster):**
+
+- Non-zero sustained `rate(banyandb_queue_pub_send_retry_exhausted_total[5m])`
on liaison.
+- `chunk_reorder_buffered_chunks` or `chunked_sync_active_sessions` above an
environment-specific ceiling for a single `topic`.
+
+#### Aggregate pipeline error rate (optional)
+
+To combine legacy queue stream errors with publisher-side failures (per minute
scaling as elsewhere in this doc):
+
+**Expression**:
`sum(rate(banyandb_queue_sub_total_msg_sent_err{job=~"$job",instance=~"$instance"}[$__rate_interval])*60)
+
sum(rate(banyandb_queue_sub_total_msg_received_err{job=~"$job",instance=~"$instance"}[$__rate_interval])*60)
+
sum(rate(banyandb_queue_pub_send_err_total{job=~"$job",instance=~"$instance"}[$__rate_interval])*60)`
+
## Metrics Providers
BanyanDB has built-in support for metrics collection. Currently, there are two
supported metrics provider: `prometheus` and `native`. These can be enabled
through `observability-modes` flag, allowing you to activate one or both of
them.
@@ -294,7 +352,7 @@ scrape_configs:
#### Grafana Dashboard
-Check out the [BanyanDB Cluster Dashboard](grafana-cluster.json) for
monitoring BanyanDB metrics.
+Check out the [BanyanDB Cluster Dashboard](grafana-cluster.json) for
monitoring BanyanDB metrics. The dashboard includes a **Liaison internal
queue** row (`queue_sub` saturation / aborts and `queue_pub` send and error
rates); metric names and PromQL examples are documented in the **Liaison
internal queue** subsection above.
### Native