hanahmily commented on code in PR #1113:
URL:
https://github.com/apache/skywalking-banyandb/pull/1113#discussion_r3199922137
##########
banyand/queue/pub/batch.go:
##########
@@ -162,48 +180,65 @@ func (bp *batchPublisher) Publish(ctx context.Context,
topic bus.Topic, messages
client: stream,
ctxDoneCh: streamCtx.Done(),
}
+ if bp.hasMetrics() {
+ bp.pub.metrics.inflightStreams.Add(1, node)
Review Comment:
**Bug — `cancel()` leak when `Send()` fails (just above this hunk, lines
171-178):**
```go
streamCtx, cancel := context.WithTimeout(ctx, bp.timeout)
// this assignment is for getting around the go vet lint
deferFn := cancel
stream, errCreateStream := nodeClient.client.Send(streamCtx)
if errCreateStream != nil {
err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s:
%w", node, errCreateStream))
continue
}
```
The `deferFn := cancel` aliasing only suppresses the `lostcancel` lint —
neither `cancel()` nor `deferFn()` is invoked when `Send()` fails, so
`streamCtx` leaks until `bp.timeout` (15 s by default) expires. Under repeated
transient failures this accumulates timer goroutines and context resources per
dropped stream.
```go
if errCreateStream != nil {
cancel()
err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s:
%w", node, errCreateStream))
continue
}
```
(Anchored on the newly-added `inflightStreams.Add(1, node)` line because the
cancel block sits just outside the diff hunk.)
##########
banyand/queue/pub/batch.go:
##########
@@ -162,48 +180,65 @@ func (bp *batchPublisher) Publish(ctx context.Context,
topic bus.Topic, messages
client: stream,
ctxDoneCh: streamCtx.Done(),
}
+ if bp.hasMetrics() {
+ bp.pub.metrics.inflightStreams.Add(1, node)
+ }
bp.f.events = append(bp.f.events, make(chan batchEvent))
_ = sendData()
nodeName := node
- go func(s clusterv1.Service_SendClient, deferFn func(), bc chan
batchEvent, curNode string) {
- defer func() {
- close(bc)
- deferFn()
- }()
- select {
- case <-ctx.Done():
- return
- default:
- }
- resp, errRecv := s.Recv()
- if errRecv != nil {
- if grpchelper.IsFailoverError(errRecv) {
- // Record circuit breaker failure
before creating failover event
- bp.pub.connMgr.RecordFailure(curNode,
errRecv)
- bc <- batchEvent{n: curNode, e:
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR,
errRecv.Error())}
- }
- return
- }
- if resp == nil {
- return
- }
- if resp.Error == "" {
- return
- }
- if isFailoverStatus(resp.Status) {
- ce := common.NewErrorWithStatus(resp.Status,
resp.Error)
- // Record circuit breaker failure before
creating failover event
- bp.pub.connMgr.RecordFailure(curNode, ce)
- bc <- batchEvent{n: curNode, e: ce}
- }
- }(stream, deferFn, bp.f.events[len(bp.f.events)-1], nodeName)
+ go bp.listenBatchResponse(ctx, stream, deferFn,
bp.f.events[len(bp.f.events)-1], nodeName, topicStr)
}
return nil, err
}
+func (bp *batchPublisher) hasMetrics() bool {
+ return bp.pub != nil && bp.pub.metrics != nil
+}
+
+// listenBatchResponse receives the server response and records failover
events and end-to-end failure metrics.
+func (bp *batchPublisher) listenBatchResponse(ctx context.Context, s
clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent, curNode,
topic string) {
+ defer func() {
+ close(bc)
+ deferFn()
+ }()
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ resp, errRecv := s.Recv()
+ if errRecv != nil {
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode,
sendErrReasonRecvError)
+ }
+ if grpchelper.IsFailoverError(errRecv) {
+ // Record circuit breaker failure before creating
failover event
+ bp.pub.connMgr.RecordFailure(curNode, errRecv)
+ bc <- batchEvent{n: curNode, e:
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR,
errRecv.Error())}
+ }
+ return
+ }
+ if resp == nil || resp.Error == "" {
+ return
+ }
+ if bp.hasMetrics() {
+ bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode,
sendErrReasonServerRejected)
Review Comment:
**Bug — `server_rejected` is counted but never propagated to the caller.**
For non-failover statuses, this code increments
`send_err_total{reason="server_rejected"}` but does **not** emit a
`batchEvent`, so `Close()`'s `cee` map never sees it. Operators will see the
counter rise on dashboards while the publishing call site silently treats the
batch as successful — a worse failure mode than before, because the new metric
advertises a problem that the code intentionally hides.
Two options:
1. **Surface the rejection** by emitting a `batchEvent` for any non-empty
`resp.Error`, not just failover statuses:
```go
if bp.hasMetrics() {
bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode,
sendErrReasonServerRejected)
}
ce := common.NewErrorWithStatus(resp.Status, resp.Error)
if isFailoverStatus(resp.Status) {
bp.pub.connMgr.RecordFailure(curNode, ce)
}
bc <- batchEvent{n: curNode, e: ce}
```
2. Keep current behavior but document it: add a Go doc comment on
`listenBatchResponse` and a note in `observability.md` that `server_rejected`
is observability-only and the publisher will not retry/failover on non-failover
statuses.
Option 1 is preferred — leaving the metric divergent from the visible error
surface will lead to confused on-call investigations.
##########
docs/operation/observability.md:
##########
@@ -258,6 +258,64 @@ If the value is too large, it may indicate that too many
data points are being i
**Expression**:
`sum(banyandb_stream_tst_inverted_index_total_doc_count{job=~\"$job\",instance=~\"$instance\"})
by (group)`
+### Liaison internal queue (`queue_sub` / `queue_pub`)
+
+Liaison nodes run an internal gRPC **queue server** (`server-queue-sub`, wired
via `sub.NewServerWithPorts` in `pkg/cmdsetup/liaison.go`) and **queue
clients** (`server-queue-pub`) for tier-1/tier-2 pipelines. Prometheus metrics
use the namespaces `banyandb_queue_sub_*` and `banyandb_queue_pub_*` (built
from `observability.RootScope` + `queue_sub` / `queue_pub` sub-scopes). Data
nodes may expose the same metric families where the corresponding services run.
+
+#### `queue_sub` — inbound server (including chunked sync)
+
+| Metric (suffix after `banyandb_queue_sub_`) | Type | Labels | Meaning |
+| --- | --- | --- | --- |
+| `total_started`, `total_finished`, `total_err`, `total_latency` | Counter |
`topic` | Legacy per-topic stream handler lifecycle. |
+| `total_msg_received`, `total_msg_received_err`, `total_msg_sent`,
`total_msg_sent_err` | Counter | `topic` | Per-topic message I/O errors
(included in high-level error rates below). |
+| `out_of_order_chunks_received`, `chunks_buffered` | Counter | `topic` |
Chunk reordering: out-of-order arrivals and buffer events (**`topic` only**,
not per session). |
+| `buffer_timeouts`, `large_gaps_rejected`, `buffer_capacity_exceeded`,
`finish_sync_err` | Counter | `topic` | Reorder buffer pressure and sync
completion issues. |
+| `chunked_sync_active_sessions` | Gauge | `topic` | In-flight chunked sync
sessions per topic. |
+| `chunk_reorder_buffered_chunks` | Gauge | `topic` | Chunks waiting in the
reorder buffer. |
+| `chunked_sync_aborted_total` | Counter | `topic`, `reason` | Aborted
sessions; `reason` is one of `switch`, `stream_error`, `ctx_done`, `eof`. |
+| `chunked_sync_failed_parts_total` | Counter | `topic` | Parts incomplete
when a sync completes. |
+| `chunked_sync_total_bytes_received` | Counter | `topic` | Bytes received for
completed syncs. |
+| `chunked_sync_duration_seconds` | Histogram | `topic` | Wall-clock duration
of completed syncs. |
+
+**Troubleshooting:** rising `chunk_reorder_buffered_chunks` or
`buffer_timeouts` suggests sustained out-of-order or slow consumers. Spikes in
`chunked_sync_aborted_total` with `reason=switch` often correlate with
topic/hand-off changes; `stream_error` / `ctx_done` / `eof` point to RPC
lifecycle issues. Use `chunked_sync_failed_parts_total` and the duration
histogram to separate partial completion from healthy throughput.
+
+#### `queue_pub` — outbound batch client
+
+| Metric (suffix after `banyandb_queue_pub_`) | Type | Labels | Meaning |
+| --- | --- | --- | --- |
+| `send_success_total` | Counter | `topic`, `node` | Successful `Send` on the
client stream (local write, not end-to-end ack). |
+| `send_bytes_total` | Counter | `topic`, `node` | Payload bytes on successful
`Send`. |
+| `send_duration_seconds` | Histogram | `topic`, `node` | Time spent in the
send path including retries. |
Review Comment:
Paired with the `send_duration_seconds` issue in `batch.go` — this row is
technically true but operationally misleading because the histogram includes
fail-fast paths (non-transient rejections, ctx-canceled, stream-canceled).
If the unconditional observe is kept, please append a clarifier, e.g.:
> Time spent in the send path including retries. **Recorded for both
successful and failed attempts**; filter with `send_err_total` (or use a
`result` label, if added) when isolating success latency.
If you split the histogram by `result` label, update the Labels column to ``
`topic`, `node`, `result` ``.
##########
banyand/queue/pub/batch.go:
##########
@@ -280,8 +315,14 @@ func isFailoverStatus(s modelv1.Status) bool {
}
// retrySend implements bounded retries for client streaming sends with
exponential backoff and jitter.
-func (bp *batchPublisher) retrySend(ctx context.Context, stream
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string) error {
+func (bp *batchPublisher) retrySend(ctx context.Context, stream
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string, topic
string) error {
var lastErr error
+ if bp.hasMetrics() {
+ start := time.Now()
+ defer func() {
+
bp.pub.metrics.sendDurationSeconds.Observe(time.Since(start).Seconds(), topic,
node)
Review Comment:
**Bug — `send_duration_seconds` observed on every outcome skews the latency
signal.**
This defer records duration for **every** return path — including µs-fast
`InvalidArgument` rejections (`return sendErr` after the `IsTransientError`
check), context cancellations, and stream-canceled returns. The histogram will
be biased left by fail-fast errors, which dilutes operator signal when watching
p99 send latency.
Two viable fixes:
- **Add a `result` label** so success and each failure mode are
distinguishable:
```go
factory.NewHistogram("send_duration_seconds", meter.DefBuckets, "topic",
"node", "result")
```
Then `Observe(..., topic, node, "success" | "non_transient" |
"retry_exhausted" | "canceled")` at each return path.
- **Or only observe on success and retry-exhausted paths** — the two
outcomes where end-to-end send latency is operationally meaningful.
Whichever path you take, please also update the doc table in
`docs/operation/observability.md` (see paired comment on the
`send_duration_seconds` row).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]