hanahmily commented on code in PR #1113:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1113#discussion_r3199922137


##########
banyand/queue/pub/batch.go:
##########
@@ -162,48 +180,65 @@ func (bp *batchPublisher) Publish(ctx context.Context, 
topic bus.Topic, messages
                        client:    stream,
                        ctxDoneCh: streamCtx.Done(),
                }
+               if bp.hasMetrics() {
+                       bp.pub.metrics.inflightStreams.Add(1, node)

Review Comment:
   **Bug — `cancel()` leak when `Send()` fails (just above this hunk, lines 
171-178):**
   
   ```go
   streamCtx, cancel := context.WithTimeout(ctx, bp.timeout)
   // this assignment is for getting around the go vet lint
   deferFn := cancel
   stream, errCreateStream := nodeClient.client.Send(streamCtx)
   if errCreateStream != nil {
       err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s: 
%w", node, errCreateStream))
       continue
   }
   ```
   
   The `deferFn := cancel` aliasing only suppresses the `lostcancel` lint — 
neither `cancel()` nor `deferFn()` is invoked when `Send()` fails, so 
`streamCtx` leaks until `bp.timeout` (15 s by default) expires. Under repeated 
transient failures this accumulates timer goroutines and context resources per 
dropped stream.
   
   ```go
   if errCreateStream != nil {
       cancel()
       err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s: 
%w", node, errCreateStream))
       continue
   }
   ```
   
   (Anchored on the newly-added `inflightStreams.Add(1, node)` line because the 
cancel block sits just outside the diff hunk.)



##########
banyand/queue/pub/batch.go:
##########
@@ -162,48 +180,65 @@ func (bp *batchPublisher) Publish(ctx context.Context, 
topic bus.Topic, messages
                        client:    stream,
                        ctxDoneCh: streamCtx.Done(),
                }
+               if bp.hasMetrics() {
+                       bp.pub.metrics.inflightStreams.Add(1, node)
+               }
                bp.f.events = append(bp.f.events, make(chan batchEvent))
                _ = sendData()
                nodeName := node
-               go func(s clusterv1.Service_SendClient, deferFn func(), bc chan 
batchEvent, curNode string) {
-                       defer func() {
-                               close(bc)
-                               deferFn()
-                       }()
-                       select {
-                       case <-ctx.Done():
-                               return
-                       default:
-                       }
-                       resp, errRecv := s.Recv()
-                       if errRecv != nil {
-                               if grpchelper.IsFailoverError(errRecv) {
-                                       // Record circuit breaker failure 
before creating failover event
-                                       bp.pub.connMgr.RecordFailure(curNode, 
errRecv)
-                                       bc <- batchEvent{n: curNode, e: 
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, 
errRecv.Error())}
-                               }
-                               return
-                       }
-                       if resp == nil {
-                               return
-                       }
-                       if resp.Error == "" {
-                               return
-                       }
-                       if isFailoverStatus(resp.Status) {
-                               ce := common.NewErrorWithStatus(resp.Status, 
resp.Error)
-                               // Record circuit breaker failure before 
creating failover event
-                               bp.pub.connMgr.RecordFailure(curNode, ce)
-                               bc <- batchEvent{n: curNode, e: ce}
-                       }
-               }(stream, deferFn, bp.f.events[len(bp.f.events)-1], nodeName)
+               go bp.listenBatchResponse(ctx, stream, deferFn, 
bp.f.events[len(bp.f.events)-1], nodeName, topicStr)
        }
        return nil, err
 }
 
+func (bp *batchPublisher) hasMetrics() bool {
+       return bp.pub != nil && bp.pub.metrics != nil
+}
+
+// listenBatchResponse receives the server response and records failover 
events and end-to-end failure metrics.
+func (bp *batchPublisher) listenBatchResponse(ctx context.Context, s 
clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent, curNode, 
topic string) {
+       defer func() {
+               close(bc)
+               deferFn()
+       }()
+       select {
+       case <-ctx.Done():
+               return
+       default:
+       }
+
+       resp, errRecv := s.Recv()
+       if errRecv != nil {
+               if bp.hasMetrics() {
+                       bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode, 
sendErrReasonRecvError)
+               }
+               if grpchelper.IsFailoverError(errRecv) {
+                       // Record circuit breaker failure before creating 
failover event
+                       bp.pub.connMgr.RecordFailure(curNode, errRecv)
+                       bc <- batchEvent{n: curNode, e: 
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, 
errRecv.Error())}
+               }
+               return
+       }
+       if resp == nil || resp.Error == "" {
+               return
+       }
+       if bp.hasMetrics() {
+               bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode, 
sendErrReasonServerRejected)

Review Comment:
   **Bug — `server_rejected` is counted but never propagated to the caller.**
   
   For non-failover statuses, this code increments 
`send_err_total{reason="server_rejected"}` but does **not** emit a 
`batchEvent`, so `Close()`'s `cee` map never sees it. Operators will see the 
counter rise on dashboards while the publishing call site silently treats the 
batch as successful — a worse failure mode than before, because the new metric 
advertises a problem that the code intentionally hides.
   
   Two options:
   
   1. **Surface the rejection** by emitting a `batchEvent` for any non-empty 
`resp.Error`, not just failover statuses:
      ```go
      if bp.hasMetrics() {
          bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode, 
sendErrReasonServerRejected)
      }
      ce := common.NewErrorWithStatus(resp.Status, resp.Error)
      if isFailoverStatus(resp.Status) {
          bp.pub.connMgr.RecordFailure(curNode, ce)
      }
      bc <- batchEvent{n: curNode, e: ce}
      ```
   2. Keep current behavior but document it: add a Go doc comment on 
`listenBatchResponse` and a note in `observability.md` that `server_rejected` 
is observability-only and the publisher will not retry/failover on non-failover 
statuses.
   
   Option 1 is preferred — leaving the metric divergent from the visible error 
surface will lead to confused on-call investigations.



##########
docs/operation/observability.md:
##########
@@ -258,6 +258,64 @@ If the value is too large, it may indicate that too many 
data points are being i
 
 **Expression**: 
`sum(banyandb_stream_tst_inverted_index_total_doc_count{job=~\"$job\",instance=~\"$instance\"})
 by (group)`
 
+### Liaison internal queue (`queue_sub` / `queue_pub`)
+
+Liaison nodes run an internal gRPC **queue server** (`server-queue-sub`, wired 
via `sub.NewServerWithPorts` in `pkg/cmdsetup/liaison.go`) and **queue 
clients** (`server-queue-pub`) for tier-1/tier-2 pipelines. Prometheus metrics 
use the namespaces `banyandb_queue_sub_*` and `banyandb_queue_pub_*` (built 
from `observability.RootScope` + `queue_sub` / `queue_pub` sub-scopes). Data 
nodes may expose the same metric families where the corresponding services run.
+
+#### `queue_sub` — inbound server (including chunked sync)
+
+| Metric (suffix after `banyandb_queue_sub_`) | Type | Labels | Meaning |
+| --- | --- | --- | --- |
+| `total_started`, `total_finished`, `total_err`, `total_latency` | Counter | 
`topic` | Legacy per-topic stream handler lifecycle. |
+| `total_msg_received`, `total_msg_received_err`, `total_msg_sent`, 
`total_msg_sent_err` | Counter | `topic` | Per-topic message I/O errors 
(included in high-level error rates below). |
+| `out_of_order_chunks_received`, `chunks_buffered` | Counter | `topic` | 
Chunk reordering: out-of-order arrivals and buffer events (**`topic` only**, 
not per session). |
+| `buffer_timeouts`, `large_gaps_rejected`, `buffer_capacity_exceeded`, 
`finish_sync_err` | Counter | `topic` | Reorder buffer pressure and sync 
completion issues. |
+| `chunked_sync_active_sessions` | Gauge | `topic` | In-flight chunked sync 
sessions per topic. |
+| `chunk_reorder_buffered_chunks` | Gauge | `topic` | Chunks waiting in the 
reorder buffer. |
+| `chunked_sync_aborted_total` | Counter | `topic`, `reason` | Aborted 
sessions; `reason` is one of `switch`, `stream_error`, `ctx_done`, `eof`. |
+| `chunked_sync_failed_parts_total` | Counter | `topic` | Parts incomplete 
when a sync completes. |
+| `chunked_sync_total_bytes_received` | Counter | `topic` | Bytes received for 
completed syncs. |
+| `chunked_sync_duration_seconds` | Histogram | `topic` | Wall-clock duration 
of completed syncs. |
+
+**Troubleshooting:** rising `chunk_reorder_buffered_chunks` or 
`buffer_timeouts` suggests sustained out-of-order or slow consumers. Spikes in 
`chunked_sync_aborted_total` with `reason=switch` often correlate with 
topic/hand-off changes; `stream_error` / `ctx_done` / `eof` point to RPC 
lifecycle issues. Use `chunked_sync_failed_parts_total` and the duration 
histogram to separate partial completion from healthy throughput.
+
+#### `queue_pub` — outbound batch client
+
+| Metric (suffix after `banyandb_queue_pub_`) | Type | Labels | Meaning |
+| --- | --- | --- | --- |
+| `send_success_total` | Counter | `topic`, `node` | Successful `Send` on the 
client stream (local write, not end-to-end ack). |
+| `send_bytes_total` | Counter | `topic`, `node` | Payload bytes on successful 
`Send`. |
+| `send_duration_seconds` | Histogram | `topic`, `node` | Time spent in the 
send path including retries. |

Review Comment:
   Paired with the `send_duration_seconds` issue in `batch.go` — this row is 
technically true but operationally misleading because the histogram includes 
fail-fast paths (non-transient rejections, ctx-canceled, stream-canceled).
   
   If the unconditional observe is kept, please append a clarifier, e.g.:
   
   > Time spent in the send path including retries. **Recorded for both 
successful and failed attempts**; filter with `send_err_total` (or use a 
`result` label, if added) when isolating success latency.
   
   If you split the histogram by `result` label, update the Labels column to `` 
`topic`, `node`, `result` ``.



##########
banyand/queue/pub/batch.go:
##########
@@ -280,8 +315,14 @@ func isFailoverStatus(s modelv1.Status) bool {
 }
 
 // retrySend implements bounded retries for client streaming sends with 
exponential backoff and jitter.
-func (bp *batchPublisher) retrySend(ctx context.Context, stream 
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string) error {
+func (bp *batchPublisher) retrySend(ctx context.Context, stream 
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string, topic 
string) error {
        var lastErr error
+       if bp.hasMetrics() {
+               start := time.Now()
+               defer func() {
+                       
bp.pub.metrics.sendDurationSeconds.Observe(time.Since(start).Seconds(), topic, 
node)

Review Comment:
   **Bug — `send_duration_seconds` observed on every outcome skews the latency 
signal.**
   
   This defer records duration for **every** return path — including µs-fast 
`InvalidArgument` rejections (`return sendErr` after the `IsTransientError` 
check), context cancellations, and stream-canceled returns. The histogram will 
be biased left by fail-fast errors, which dilutes operator signal when watching 
p99 send latency.
   
   Two viable fixes:
   
   - **Add a `result` label** so success and each failure mode are 
distinguishable:
     ```go
     factory.NewHistogram("send_duration_seconds", meter.DefBuckets, "topic", 
"node", "result")
     ```
     Then `Observe(..., topic, node, "success" | "non_transient" | 
"retry_exhausted" | "canceled")` at each return path.
   - **Or only observe on success and retry-exhausted paths** — the two 
outcomes where end-to-end send latency is operationally meaningful.
   
   Whichever path you take, please also update the doc table in 
`docs/operation/observability.md` (see paired comment on the 
`send_duration_seconds` row).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to