This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 08566550b feat(queue): redesign pub/sub metrics around operation,
group, and node topology (#1161)
08566550b is described below
commit 08566550bbb38c228eb7040d5087a61360d6f4be
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Jun 6 16:25:34 2026 +0800
feat(queue): redesign pub/sub metrics around operation, group, and node
topology (#1161)
* feat(queue): redesign pub/sub metrics around operation, group, and node
topology
Collapse the queue_pub/queue_sub metric sprawl into a uniform base set —
total_started, total_finished, total_latency (now a histogram) and
total_err,
plus file-sync-only sent_bytes (pub) / received_bytes (sub) — and re-label
them
by operation (batch-write/file-sync/query/control), group, and the remote
endpoint (remote_node/remote_role/remote_tier). remote_node equals the
BanyanDB
node metadata.name, so the liaison<->data (hot/warm/cold) call graph can be
drawn
by joining /cluster/topology with the metrics.
The sender's identity rides the wire via new cluster.v1.SendRequest fields
(group, sender_node/sender_role/sender_tier, stamped on the first frame of a
stream) and SyncMetadata (sender_*), so receivers can label flows by sender;
pub-side remote_role/remote_tier are resolved from the connection registry.
BREAKING CHANGE: removes the previous queue_* metric and label names
(*_total_msg_*, queue_pub_send_*, inflight/retry/backoff gauges,
chunked_sync_*/chunk_reorder_*, and the topic label); update
dashboards/alerts.
Signed-off-by: Hongtao Gao <[email protected]>
---------
Signed-off-by: Hongtao Gao <[email protected]>
---
CHANGES.md | 1 +
api/data/data.go | 114 ++++++++++
api/proto/banyandb/cluster/v1/rpc.proto | 14 ++
banyand/queue/pub/batch.go | 133 +++++++-----
banyand/queue/pub/chunked_sync.go | 93 ++++++--
banyand/queue/pub/metrics_test.go | 296 +++++++++-----------------
banyand/queue/pub/pub.go | 136 +++++++++---
banyand/queue/pub/pub_test.go | 5 +
banyand/queue/pub/retry_test.go | 18 +-
banyand/queue/sub/chunked_sync.go | 217 +++++++++----------
banyand/queue/sub/chunked_sync_test.go | 145 ++++++-------
banyand/queue/sub/helpers.go | 41 ++--
banyand/queue/sub/server.go | 79 +------
banyand/queue/sub/server_metrics_test.go | 249 ++++++++++++----------
banyand/queue/sub/sub.go | 352 +++++++++++++++++++------------
docs/api-reference.md | 7 +
pkg/cmdsetup/liaison.go | 9 +
17 files changed, 1063 insertions(+), 846 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 79c40d6a6..2f4c650bf 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,6 +5,7 @@ Release Notes.
## 0.11.0
### Features
+- Redesign the queue (`queue_pub`/`queue_sub`) metrics around a uniform model:
keep only `total_started`, `total_finished`, `total_latency` (now a histogram)
and `total_err`, plus file-sync-only `sent_bytes` (pub) / `received_bytes`
(sub). Replace the `topic` label with `operation`
(`batch-write`/`file-sync`/`query`/`control`) and `group`, add an `error_type`
label on `total_err`, and add remote-endpoint labels
(`remote_node`/`remote_role`/`remote_tier`) so the liaison↔data (hot/warm/col
[...]
- Vectorized measure query path is now enabled by default. The columnar
pipeline replaces per-row protobuf serialization in `NewMIterator`, cutting
allocations and ns/op for scan-heavy measure queries; gRPC wire format
(`*measurev1.InternalDataPoint`) is byte-identical. Single-node coverage is
complete: scan, GroupBy+Agg via `BatchAggregation`, scalar reduce (`Agg`
without `GroupBy`), raw `GroupBy` (without `Agg`), implicit projection coverage
for GroupBy/Agg fields, `TopN`/`BottomN`, `o [...]
- Add validation to ensure Measure's ShardingKey contains all Entity tags to
guarantee entity locality.
- Organize access logs under a dedicated "accesslog" subdirectory to improve
log organization and separation from other application data.
diff --git a/api/data/data.go b/api/data/data.go
index 822ea1401..b11baea5c 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -243,3 +243,117 @@ var (
// TopicCommon is the common topic for data transmission.
TopicCommon = bus.Topic{}
)
+
+const (
+ // OperationFileSyncValue is the operation label for file-sync (chunked
sync) operations.
+ OperationFileSyncValue = "file-sync"
+ // OperationBatchWriteValue is the operation label for batch-write
operations.
+ OperationBatchWriteValue = "batch-write"
+ // OperationQueryValue is the operation label for query operations.
+ OperationQueryValue = "query"
+ // OperationControlValue is the operation label for control operations.
+ OperationControlValue = "control"
+)
+
+var operationMap = map[bus.Topic]string{
+ TopicStreamPartSync: OperationFileSyncValue,
+ TopicMeasurePartSync: OperationFileSyncValue,
+ TopicTracePartSync: OperationFileSyncValue,
+ TopicStreamSeriesSync: OperationFileSyncValue,
+ TopicMeasureSeriesSync: OperationFileSyncValue,
+ TopicTraceSeriesSync: OperationFileSyncValue,
+ TopicStreamElementIndexSync: OperationFileSyncValue,
+ TopicStreamWrite: OperationBatchWriteValue,
+ TopicMeasureWrite: OperationBatchWriteValue,
+ TopicTraceWrite: OperationBatchWriteValue,
+ TopicMeasureSeriesIndexInsert: OperationBatchWriteValue,
+ TopicMeasureSeriesIndexUpdate: OperationBatchWriteValue,
+ TopicStreamSeriesIndexWrite: OperationBatchWriteValue,
+ TopicStreamLocalIndexWrite: OperationBatchWriteValue,
+ TopicTraceSidxSeriesWrite: OperationBatchWriteValue,
+ TopicStreamQuery: OperationQueryValue,
+ TopicMeasureQuery: OperationQueryValue,
+ TopicInternalMeasureQuery: OperationQueryValue,
+ TopicTopNQuery: OperationQueryValue,
+ TopicTraceQuery: OperationQueryValue,
+ TopicPropertyUpdate: OperationControlValue,
+ TopicPropertyDelete: OperationControlValue,
+ TopicPropertyQuery: OperationControlValue,
+ TopicPropertyRepair: OperationControlValue,
+ TopicMeasureCollectDataInfo: OperationControlValue,
+ TopicMeasureCollectLiaisonInfo: OperationControlValue,
+ TopicStreamCollectDataInfo: OperationControlValue,
+ TopicStreamCollectLiaisonInfo: OperationControlValue,
+ TopicTraceCollectDataInfo: OperationControlValue,
+ TopicTraceCollectLiaisonInfo: OperationControlValue,
+ TopicMeasureDropGroup: OperationControlValue,
+ TopicStreamDropGroup: OperationControlValue,
+ TopicTraceDropGroup: OperationControlValue,
+}
+
+// OperationOf returns the operation label string for the given bus topic.
+func OperationOf(topic bus.Topic) string {
+ if op, ok := operationMap[topic]; ok {
+ return op
+ }
+ return OperationControlValue
+}
+
+// GroupFromMessageData extracts the business group string from the
pre-marshal request proto (pub side).
+func GroupFromMessageData(_ bus.Topic, payload any) string {
+ switch v := payload.(type) {
+ case *measurev1.InternalWriteRequest:
+ return groupFromWriteMetadata(v.GetRequest().GetMetadata())
+ case *streamv1.InternalWriteRequest:
+ return groupFromWriteMetadata(v.GetRequest().GetMetadata())
+ case *tracev1.InternalWriteRequest:
+ return groupFromWriteMetadata(v.GetRequest().GetMetadata())
+ case *measurev1.QueryRequest:
+ return firstGroup(v.GetGroups())
+ case *measurev1.InternalQueryRequest:
+ return firstGroup(v.GetRequest().GetGroups())
+ case *measurev1.TopNRequest:
+ return firstGroup(v.GetGroups())
+ case *streamv1.QueryRequest:
+ return firstGroup(v.GetGroups())
+ case *tracev1.QueryRequest:
+ return firstGroup(v.GetGroups())
+ case *propertyv1.InternalUpdateRequest:
+ return groupFromProperty(v.GetProperty())
+ case *propertyv1.InternalRepairRequest:
+ return groupFromProperty(v.GetProperty())
+ case *propertyv1.QueryRequest:
+ return firstGroup(v.GetGroups())
+ case *databasev1.GroupRegistryServiceDeleteRequest:
+ return v.GetGroup()
+ }
+ return ""
+}
+
+type groupGetter interface {
+ GetGroup() string
+}
+
+func groupFromWriteMetadata(md groupGetter) string {
+ if md == nil {
+ return ""
+ }
+ return md.GetGroup()
+}
+
+func groupFromProperty(p *propertyv1.Property) string {
+ if p == nil {
+ return ""
+ }
+ if md := p.GetMetadata(); md != nil {
+ return md.GetGroup()
+ }
+ return ""
+}
+
+func firstGroup(groups []string) string {
+ if len(groups) > 0 {
+ return groups[0]
+ }
+ return ""
+}
diff --git a/api/proto/banyandb/cluster/v1/rpc.proto
b/api/proto/banyandb/cluster/v1/rpc.proto
index f71d89bcb..aec1b9963 100644
--- a/api/proto/banyandb/cluster/v1/rpc.proto
+++ b/api/proto/banyandb/cluster/v1/rpc.proto
@@ -54,6 +54,14 @@ message SendRequest {
bool batch_mod = 4;
// version_info contains version information
VersionInfo version_info = 5;
+ // group is the business group associated with this message.
+ string group = 6;
+ // sender_node is the BanyanDB node name of the sender; set only on the
first message of a stream.
+ string sender_node = 7;
+ // sender_role is the role of the sender node; set only on the first message
of a stream.
+ string sender_role = 8;
+ // sender_tier is the storage tier label of the sender node; set only on the
first message of a stream.
+ string sender_tier = 9;
}
message SendResponse {
@@ -121,6 +129,12 @@ message SyncMetadata {
string topic = 4; // Sync topic (stream-part-sync or measure-part-sync).
int64 timestamp = 6; // Timestamp when sync started.
uint32 total_parts = 7; // Total number of parts being synced.
+ // sender_node is the BanyanDB node name of the sender.
+ string sender_node = 8;
+ // sender_role is the role of the sender node.
+ string sender_role = 9;
+ // sender_tier is the storage tier label of the sender node.
+ string sender_tier = 10;
}
// SyncCompletion contains completion information for the sync operation.
diff --git a/banyand/queue/pub/batch.go b/banyand/queue/pub/batch.go
index b183663f5..7ca830b92 100644
--- a/banyand/queue/pub/batch.go
+++ b/banyand/queue/pub/batch.go
@@ -26,7 +26,7 @@ import (
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
- "github.com/apache/skywalking-banyandb/api/data"
+ apidata "github.com/apache/skywalking-banyandb/api/data"
clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -50,13 +50,12 @@ const (
// Remote-side failures (observed after the frame was written to the
stream).
sendErrReasonRecvError = "recv_error" // s.Recv returned an
error (connection/protocol layer).
sendErrReasonServerRejected = "server_rejected" // Server responded
with a non-empty Error (includes failover statuses).
-
- sendResultSuccess = "success"
)
type writeStream struct {
- client clusterv1.Service_SendClient
- ctxDoneCh <-chan struct{}
+ client clusterv1.Service_SendClient
+ ctxDoneCh <-chan struct{}
+ firstFrameSent bool // false until the first frame is sent; the first
frame carries the sender_* identity labels
}
type batchPublisher struct {
@@ -97,22 +96,13 @@ func (bp *batchPublisher) Publish(ctx context.Context,
topic bus.Topic, messages
continue
}
- topicStr := topic.String()
sendData := func() (success bool) {
if stream, ok := bp.streams[node]; ok {
- hasMetrics := bp.hasMetrics()
defer func() {
if !success {
delete(bp.streams, node)
- if hasMetrics {
-
bp.pub.metrics.inflightStreams.Add(-1, node)
- }
}
}()
- if hasMetrics {
- bp.pub.metrics.inflightRequests.Add(1,
topicStr, node)
- defer
bp.pub.metrics.inflightRequests.Add(-1, topicStr, node)
- }
select {
case <-ctx.Done():
return false
@@ -120,13 +110,28 @@ func (bp *batchPublisher) Publish(ctx context.Context,
topic bus.Topic, messages
return false
default:
}
- errSend := bp.retrySend(ctx, stream.client, r,
node, topicStr)
+ // Stamp sender identity on the first frame of
each stream.
+ if !stream.firstFrameSent {
+ r.SenderNode = bp.pub.selfNode
+ r.SenderRole = bp.pub.selfRole
+ r.SenderTier = bp.pub.selfTier
+ } else {
+ r.SenderNode = ""
+ r.SenderRole = ""
+ r.SenderTier = ""
+ }
+ errSend := bp.retrySend(ctx, stream.client, r,
node)
if errSend != nil {
err = multierr.Append(err,
fmt.Errorf("failed to send message to node %s: %w", node, errSend))
// Record failure for circuit breaker
(only for transient/internal errors)
bp.pub.connMgr.RecordFailure(node,
errSend)
return false
}
+ if !stream.firstFrameSent {
+ ws := bp.streams[node]
+ ws.firstFrameSent = true
+ bp.streams[node] = ws
+ }
// Record success for circuit breaker
bp.pub.connMgr.RecordSuccess(node)
return true
@@ -187,9 +192,6 @@ func (bp *batchPublisher) Publish(ctx context.Context,
topic bus.Topic, messages
client: stream,
ctxDoneCh: streamCtx.Done(),
}
- if bp.hasMetrics() {
- bp.pub.metrics.inflightStreams.Add(1, node)
- }
bp.f.events = append(bp.f.events, make(chan batchEvent))
_ = sendData()
nodeName := node
@@ -197,7 +199,7 @@ func (bp *batchPublisher) Publish(ctx context.Context,
topic bus.Topic, messages
recvDeferFn := deferFn
recvBC := bp.f.events[len(bp.f.events)-1]
run.Go(ctx, "batch-stream-recv", bp.pub.log, func(runCtx
context.Context) {
- bp.listenBatchResponse(runCtx, recvStream, recvDeferFn,
recvBC, nodeName, topicStr)
+ bp.listenBatchResponse(runCtx, recvStream, recvDeferFn,
recvBC, nodeName)
})
}
return nil, err
@@ -208,7 +210,7 @@ func (bp *batchPublisher) hasMetrics() bool {
}
// listenBatchResponse receives the server response and records failover
events and end-to-end failure metrics.
-func (bp *batchPublisher) listenBatchResponse(ctx context.Context, s
clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent, curNode,
topic string) {
+func (bp *batchPublisher) listenBatchResponse(ctx context.Context, s
clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent, curNode
string) {
defer func() {
close(bc)
deferFn()
@@ -219,15 +221,28 @@ func (bp *batchPublisher) listenBatchResponse(ctx
context.Context, s clusterv1.S
default:
}
+ var topic bus.Topic
+ if bp.topic != nil {
+ topic = *bp.topic
+ }
+ operation := apidata.OperationOf(topic)
+ var info nodeInfo
+ if bp.pub != nil {
+ info = bp.pub.getNodeInfo(curNode)
+ }
+
resp, errRecv := s.Recv()
if errRecv != nil {
if bp.hasMetrics() {
- bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode,
sendErrReasonRecvError)
+ bp.pub.metrics.totalErr.Inc(1, operation, "", curNode,
info.role, info.tier, sendErrReasonRecvError)
}
if grpchelper.IsFailoverError(errRecv) {
// Record circuit breaker failure before creating
failover event
bp.pub.connMgr.RecordFailure(curNode, errRecv)
- bc <- batchEvent{n: curNode, e:
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR,
errRecv.Error())}
+ select {
+ case bc <- batchEvent{n: curNode, e:
common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR,
errRecv.Error())}:
+ case <-ctx.Done():
+ }
}
return
}
@@ -235,7 +250,7 @@ func (bp *batchPublisher) listenBatchResponse(ctx
context.Context, s clusterv1.S
return
}
if bp.hasMetrics() {
- bp.pub.metrics.sendErrTotal.Inc(1, topic, curNode,
sendErrReasonServerRejected)
+ bp.pub.metrics.totalErr.Inc(1, operation, "", curNode,
info.role, info.tier, sendErrReasonServerRejected)
}
ce := common.NewErrorWithStatus(resp.Status, resp.Error)
// Only failover statuses trigger circuit-breaker accounting; other
server-side
@@ -245,17 +260,17 @@ func (bp *batchPublisher) listenBatchResponse(ctx
context.Context, s clusterv1.S
bp.pub.connMgr.RecordFailure(curNode, ce)
}
// Always surface a batchEvent for any non-empty resp.Error so that
Close() exposes
- // the rejection to the caller. Otherwise the
send_err_total{reason="server_rejected"}
- // counter would rise on dashboards while callers silently treat the
batch as successful.
- bc <- batchEvent{n: curNode, e: ce}
+ // the rejection to the caller.
+ select {
+ case bc <- batchEvent{n: curNode, e: ce}:
+ case <-ctx.Done():
+ }
}
func (bp *batchPublisher) Close() (cee map[string]*common.Error, err error) {
- for nodeName, stream := range bp.streams {
+ for nodeName := range bp.streams {
+ stream := bp.streams[nodeName]
err = multierr.Append(err, stream.client.CloseSend())
- if bp.hasMetrics() {
- bp.pub.metrics.inflightStreams.Add(-1, nodeName)
- }
}
for i := range bp.streams {
<-bp.streams[i].ctxDoneCh
@@ -271,7 +286,7 @@ func (bp *batchPublisher) Close() (cee
map[string]*common.Error, err error) {
// Record circuit breaker failure before
failover
bp.pub.connMgr.RecordFailure(n, e.e)
if bp.topic == nil {
- bp.pub.failover(ctx, n, e.e,
data.TopicCommon)
+ bp.pub.failover(ctx, n, e.e,
apidata.TopicCommon)
continue
}
bp.pub.failover(ctx, n, e.e, *bp.topic)
@@ -332,12 +347,24 @@ func isFailoverStatus(s modelv1.Status) bool {
}
// retrySend implements bounded retries for client streaming sends with
exponential backoff and jitter.
-func (bp *batchPublisher) retrySend(ctx context.Context, stream
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string, topic
string) error {
+func (bp *batchPublisher) retrySend(ctx context.Context, stream
clusterv1.Service_SendClient, r *clusterv1.SendRequest, node string) error {
var lastErr error
start := time.Now()
- observeDuration := func(result string) {
+
+ var topic bus.Topic
+ if bp.topic != nil {
+ topic = *bp.topic
+ }
+ operation := apidata.OperationOf(topic)
+ var info nodeInfo
+ if bp.pub != nil {
+ info = bp.pub.getNodeInfo(node)
+ }
+ group := r.GetGroup()
+
+ observeLatency := func() {
if bp.hasMetrics() {
-
bp.pub.metrics.sendDurationSeconds.Observe(time.Since(start).Seconds(), topic,
node, result)
+
bp.pub.metrics.totalLatency.Observe(time.Since(start).Seconds(), operation,
group, node, info.role, info.tier)
}
}
@@ -350,16 +377,16 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
case <-ctx.Done():
cancel()
if bp.hasMetrics() {
- bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonCanceled)
+ bp.pub.metrics.totalErr.Inc(1, operation,
group, node, info.role, info.tier, sendErrReasonCanceled)
}
- observeDuration(sendErrReasonCanceled)
+ observeLatency()
return ctx.Err()
case <-stream.Context().Done():
cancel()
if bp.hasMetrics() {
- bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonStreamCanceled)
+ bp.pub.metrics.totalErr.Inc(1, operation,
group, node, info.role, info.tier, sendErrReasonStreamCanceled)
}
- observeDuration(sendErrReasonStreamCanceled)
+ observeLatency()
return stream.Context().Err()
case <-attemptCtx.Done():
cancel()
@@ -377,11 +404,11 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
if sendErr == nil {
if bp.hasMetrics() {
- bp.pub.metrics.sendSuccessTotal.Inc(1, topic,
node)
-
bp.pub.metrics.sendBytesTotal.Inc(float64(len(r.Body)), topic, node)
+ bp.pub.metrics.totalStarted.Inc(1, operation,
group, node, info.role, info.tier)
+ bp.pub.metrics.totalFinished.Inc(1, operation,
group, node, info.role, info.tier)
}
// Success writing to the local stream; end-to-end ack
is observed in listenBatchResponse.
- observeDuration(sendResultSuccess)
+ observeLatency()
return nil
}
@@ -391,16 +418,12 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
if !grpchelper.IsTransientError(sendErr) {
// Non-transient error, don't retry
if bp.hasMetrics() {
- bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonNonTransient)
+ bp.pub.metrics.totalErr.Inc(1, operation,
group, node, info.role, info.tier, sendErrReasonNonTransient)
}
- observeDuration(sendErrReasonNonTransient)
+ observeLatency()
return sendErr
}
- if bp.hasMetrics() {
- bp.pub.metrics.sendRetryAttempts.Inc(1, topic, node)
- }
-
// If this was the last attempt, don't sleep
if attempt >= defaultMaxRetries {
break
@@ -413,29 +436,25 @@ func (bp *batchPublisher) retrySend(ctx context.Context,
stream clusterv1.Servic
select {
case <-time.After(backoff):
// Continue to next attempt
- if bp.hasMetrics() {
-
bp.pub.metrics.sendBackoffSeconds.Inc(backoff.Seconds(), topic, node)
- }
case <-ctx.Done():
if bp.hasMetrics() {
- bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonCanceled)
+ bp.pub.metrics.totalErr.Inc(1, operation,
group, node, info.role, info.tier, sendErrReasonCanceled)
}
- observeDuration(sendErrReasonCanceled)
+ observeLatency()
return ctx.Err()
case <-stream.Context().Done():
if bp.hasMetrics() {
- bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonStreamCanceled)
+ bp.pub.metrics.totalErr.Inc(1, operation,
group, node, info.role, info.tier, sendErrReasonStreamCanceled)
}
- observeDuration(sendErrReasonStreamCanceled)
+ observeLatency()
return stream.Context().Err()
}
}
// All retries exhausted
if bp.hasMetrics() {
- bp.pub.metrics.sendRetryExhausted.Inc(1, topic, node)
- bp.pub.metrics.sendErrTotal.Inc(1, topic, node,
sendErrReasonRetryExhausted)
+ bp.pub.metrics.totalErr.Inc(1, operation, group, node,
info.role, info.tier, sendErrReasonRetryExhausted)
}
- observeDuration(sendErrReasonRetryExhausted)
+ observeLatency()
return fmt.Errorf("retry exhausted for node %s after %d attempts, last
error: %w", node, defaultMaxRetries+1, lastErr)
}
diff --git a/banyand/queue/pub/chunked_sync.go
b/banyand/queue/pub/chunked_sync.go
index 1a2cd3f30..f0da29073 100644
--- a/banyand/queue/pub/chunked_sync.go
+++ b/banyand/queue/pub/chunked_sync.go
@@ -28,6 +28,7 @@ import (
"google.golang.org/grpc"
+ apidata "github.com/apache/skywalking-banyandb/api/data"
apiversion "github.com/apache/skywalking-banyandb/api/proto/banyandb"
clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
@@ -43,12 +44,18 @@ const (
)
type chunkedSyncClient struct {
- client clusterv1.ServiceClient
- conn *grpc.ClientConn
- log *logger.Logger
- config *ChunkedSyncClientConfig
- node string
- chunkSize uint32
+ client clusterv1.ServiceClient
+ conn *grpc.ClientConn
+ log *logger.Logger
+ metrics *pubMetrics
+ config *ChunkedSyncClientConfig
+ selfNode string
+ selfRole string
+ selfTier string
+ node string
+ remoteRole string
+ remoteTier string
+ chunkSize uint32
}
// SyncStreamingParts implements queue.ChunkedSyncClient with streaming
support.
@@ -99,22 +106,43 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
}()
startTime := time.Now()
+ group := parts[0].Group
+
+ topicStr := parts[0].Topic
+ operation := apidata.OperationFileSyncValue
+
+ if c.metrics != nil {
+ c.metrics.totalStarted.Inc(1, operation, group, c.node,
c.remoteRole, c.remoteTier)
+ }
metadata := &clusterv1.SyncMetadata{
- Group: parts[0].Group,
+ Group: group,
ShardId: parts[0].ShardID,
- Topic: parts[0].Topic,
+ Topic: topicStr,
Timestamp: startTime.UnixMilli(),
TotalParts: uint32(len(parts)),
+ SenderNode: c.selfNode,
+ SenderRole: c.selfRole,
+ SenderTier: c.selfTier,
}
var totalBytesSent uint64
- totalChunks, failedParts, err := c.streamPartsAsChunks(stream,
sessionID, metadata, parts, &totalBytesSent)
- if err != nil {
- return nil, fmt.Errorf("failed to stream parts: %w", err)
+ totalChunks, failedParts, streamErr := c.streamPartsAsChunks(stream,
sessionID, metadata, parts, &totalBytesSent)
+ if streamErr != nil {
+ errType := classifyChunkedSyncPubErr(streamErr)
+ if c.metrics != nil {
+ c.metrics.totalErr.Inc(1, operation, group, c.node,
c.remoteRole, c.remoteTier, errType)
+ }
+ return nil, fmt.Errorf("failed to stream parts: %w", streamErr)
}
if totalChunks == 0 && len(failedParts) == 0 {
+ duration := time.Since(startTime)
+ if c.metrics != nil {
+ c.metrics.totalFinished.Inc(1, operation, group,
c.node, c.remoteRole, c.remoteTier)
+ c.metrics.totalLatency.Observe(duration.Seconds(),
operation, group, c.node, c.remoteRole, c.remoteTier)
+ c.metrics.sentBytes.Inc(float64(totalBytesSent),
operation, group, c.node, c.remoteRole, c.remoteTier)
+ }
return &queue.SyncResult{
Success: true,
SessionID: sessionID,
@@ -124,12 +152,15 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
var finalResp *clusterv1.SyncPartResponse
for {
- resp, err := stream.Recv()
- if errors.Is(err, io.EOF) {
+ resp, recvErr := stream.Recv()
+ if errors.Is(recvErr, io.EOF) {
break
}
- if err != nil {
- return nil, fmt.Errorf("failed to receive final
response: %w", err)
+ if recvErr != nil {
+ if c.metrics != nil {
+ c.metrics.totalErr.Inc(1, operation, group,
c.node, c.remoteRole, c.remoteTier, "recv_error")
+ }
+ return nil, fmt.Errorf("failed to receive final
response: %w", recvErr)
}
finalResp = resp
if resp.GetSyncResult() != nil {
@@ -148,6 +179,18 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
success = true
}
+ if success {
+ if c.metrics != nil {
+ c.metrics.totalFinished.Inc(1, operation, group,
c.node, c.remoteRole, c.remoteTier)
+ c.metrics.totalLatency.Observe(duration.Seconds(),
operation, group, c.node, c.remoteRole, c.remoteTier)
+ c.metrics.sentBytes.Inc(float64(totalBytesSent),
operation, group, c.node, c.remoteRole, c.remoteTier)
+ }
+ } else {
+ if c.metrics != nil {
+ c.metrics.totalErr.Inc(1, operation, group, c.node,
c.remoteRole, c.remoteTier, "completion_error")
+ }
+ }
+
return &queue.SyncResult{
Success: success,
SessionID: sessionID,
@@ -492,3 +535,23 @@ func (c *chunkedSyncClient) handleOutOfOrderResponse(resp
*clusterv1.SyncPartRes
func generateSessionID() string {
return fmt.Sprintf("sync-%d", time.Now().UnixNano())
}
+
+// classifyChunkedSyncPubErr maps a chunked-sync send error to an error_type
label value.
+func classifyChunkedSyncPubErr(err error) string {
+ if err == nil {
+ return ""
+ }
+ msg := err.Error()
+ switch {
+ case strings.Contains(msg, "checksum mismatch"):
+ return "checksum_mismatch"
+ case strings.Contains(msg, "out of order"):
+ return "out_of_order"
+ case strings.Contains(msg, "session") && strings.Contains(msg, "not
found"):
+ return "session_not_found"
+ case strings.Contains(msg, "receive") || strings.Contains(msg, "recv"):
+ return "recv_error"
+ default:
+ return "stream_error"
+ }
+}
diff --git a/banyand/queue/pub/metrics_test.go
b/banyand/queue/pub/metrics_test.go
index b326bf88a..c088628cb 100644
--- a/banyand/queue/pub/metrics_test.go
+++ b/banyand/queue/pub/metrics_test.go
@@ -19,8 +19,6 @@ package pub
import (
"context"
- "strings"
- "sync"
"testing"
"time"
@@ -71,129 +69,55 @@ func (*countingCounter) Delete(_ ...string) bool {
return true
}
-// errReasonCapturer records send_err_total increments keyed by the `reason`
label (third label).
-type errReasonCapturer struct {
+// Label order: operation, group, remote_node, remote_role, remote_tier,
error_type.
+type errReasonCapturerImpl struct {
byReason map[string]float64
- mu sync.Mutex
}
-func newErrReasonCapturer() *errReasonCapturer {
- return &errReasonCapturer{byReason: make(map[string]float64)}
+func newErrReasonCapturer() *errReasonCapturerImpl {
+ return &errReasonCapturerImpl{byReason: make(map[string]float64)}
}
-func (c *errReasonCapturer) Inc(delta float64, labels ...string) {
- c.mu.Lock()
- defer c.mu.Unlock()
- if len(labels) < 3 {
+func (c *errReasonCapturerImpl) Inc(delta float64, labels ...string) {
+ // error_type is the last label (index 5 for totalErr)
+ if len(labels) < 1 {
return
}
- reason := labels[2]
- c.byReason[reason] += delta
+ errorType := labels[len(labels)-1]
+ c.byReason[errorType] += delta
}
-func (c *errReasonCapturer) Delete(_ ...string) bool {
+func (c *errReasonCapturerImpl) Delete(_ ...string) bool {
return true
}
-func (c *errReasonCapturer) sum(reason string) float64 {
- c.mu.Lock()
- defer c.mu.Unlock()
+func (c *errReasonCapturerImpl) sum(reason string) float64 {
return c.byReason[reason]
}
-type noopGauge struct{}
-
-func (*noopGauge) Set(_ float64, _ ...string) {}
-func (*noopGauge) Add(_ float64, _ ...string) {}
-func (*noopGauge) Delete(_ ...string) bool { return true }
-
-type valGauge struct {
- mu sync.Mutex
- val float64
-}
-
-func (g *valGauge) Add(delta float64, _ ...string) {
- g.mu.Lock()
- g.val += delta
- g.mu.Unlock()
-}
-
-func (g *valGauge) Set(v float64, _ ...string) {
- g.mu.Lock()
- g.val = v
- g.mu.Unlock()
-}
-
-func (*valGauge) Delete(_ ...string) bool { return true }
-
-func (g *valGauge) value() float64 {
- g.mu.Lock()
- defer g.mu.Unlock()
- return g.val
-}
-
-// inflightKeyedGauge tracks inflight_requests Add deltas per (topic, node)
label pair.
-type inflightKeyedGauge struct {
- vals map[string]float64
- mu sync.Mutex
-}
-
-func newInflightKeyedGauge() *inflightKeyedGauge {
- return &inflightKeyedGauge{vals: make(map[string]float64)}
-}
-
-func inflightReqKey(topic, node string) string {
- return strings.Join([]string{topic, node}, "|")
-}
-
-func (g *inflightKeyedGauge) Add(delta float64, labels ...string) {
- if len(labels) < 2 {
- return
- }
- k := inflightReqKey(labels[0], labels[1])
- g.mu.Lock()
- g.vals[k] += delta
- g.mu.Unlock()
-}
-
-func (*inflightKeyedGauge) Set(_ float64, _ ...string) {}
-
-func (*inflightKeyedGauge) Delete(_ ...string) bool {
- return true
-}
-
-func (g *inflightKeyedGauge) net(topic, node string) float64 {
- g.mu.Lock()
- defer g.mu.Unlock()
- return g.vals[inflightReqKey(topic, node)]
-}
-
type noopHistogram struct{}
func (*noopHistogram) Observe(_ float64, _ ...string) {}
func (*noopHistogram) Delete(_ ...string) bool { return true }
-func newPubMetricsWithErrCapture(sendErr *errReasonCapturer) *pubMetrics {
+func newPubMetricsWithErrCapture(totalErr *errReasonCapturerImpl) *pubMetrics
{ //nolint:exhaustruct
return &pubMetrics{
- sendSuccessTotal: &countingCounter{},
- sendErrTotal: sendErr,
- sendBytesTotal: &countingCounter{},
- sendDurationSeconds: &noopHistogram{},
- sendRetryAttempts: &countingCounter{},
- sendRetryExhausted: &countingCounter{},
- sendBackoffSeconds: &countingCounter{},
- inflightStreams: &noopGauge{},
- inflightRequests: &noopGauge{},
+ totalStarted: &countingCounter{},
+ totalFinished: &countingCounter{},
+ totalLatency: &noopHistogram{},
+ totalErr: totalErr,
+ sentBytes: &countingCounter{},
}
}
func newPubWithConnMgrForMetrics(t *testing.T, pm *pubMetrics) *pub {
t.Helper()
- p := &pub{
- handlers: make(map[bus.Topic]schema.EventHandler),
- log: logger.GetLogger("queue-pub-metrics-test"),
- metrics: pm,
- closer: run.NewCloser(1),
+ p := &pub{ //nolint:exhaustruct
+ handlers: make(map[bus.Topic]schema.EventHandler),
+ log: logger.GetLogger("queue-pub-metrics-test"),
+ metrics: pm,
+ closer: run.NewCloser(1),
+ nodeCache: make(map[string]nodeInfo),
}
p.connMgr =
grpchelper.NewConnManager(grpchelper.ConnManagerConfig[*client]{
Handler: p,
@@ -204,26 +128,24 @@ func newPubWithConnMgrForMetrics(t *testing.T, pm
*pubMetrics) *pub {
return p
}
-// TestRetryMetrics ensures that retry-related metrics are updated when
retrySend
-// observes retryable errors and eventually exhausts retries.
+// TestRetryMetrics verifies that retry-exhausted and error-type metrics are
updated when
+// retrySend observes retryable errors and eventually exhausts retries.
func TestRetryMetrics(t *testing.T) {
sendErrCap := newErrReasonCapturer()
- p := &pub{
+ p := &pub{ //nolint:exhaustruct
metrics: &pubMetrics{
- sendRetryAttempts: &countingCounter{},
- sendRetryExhausted: &countingCounter{},
- sendErrTotal: sendErrCap,
- sendBackoffSeconds: &countingCounter{},
- sendSuccessTotal: &countingCounter{},
- sendBytesTotal: &countingCounter{},
- sendDurationSeconds: &noopHistogram{},
- inflightStreams: &noopGauge{},
- inflightRequests: &noopGauge{},
+ totalStarted: &countingCounter{},
+ totalFinished: &countingCounter{},
+ totalLatency: &noopHistogram{},
+ totalErr: sendErrCap,
+ sentBytes: &countingCounter{},
},
+ nodeCache: make(map[string]nodeInfo),
}
bp := &batchPublisher{
- pub: p,
+ pub: p,
+ topic: topicPtr(data.TopicMeasureWrite),
}
ctx := context.Background()
@@ -243,28 +165,28 @@ func TestRetryMetrics(t *testing.T) {
}
const nodeName = "test-node"
- const topicStr = "test-topic"
- retryErr := bp.retrySend(ctx, client, req, nodeName, topicStr)
+ retryErr := bp.retrySend(ctx, client, req, nodeName)
require.Error(t, retryErr)
- retries := p.metrics.sendRetryAttempts.(*countingCounter).count
- require.Equal(t, float64(4), retries, "one retry counter per transient
failure before exhaustion")
-
- exhausted := p.metrics.sendRetryExhausted.(*countingCounter).count
- require.Equal(t, float64(1), exhausted)
-
require.Equal(t, float64(1),
sendErrCap.sum(sendErrReasonRetryExhausted))
require.Equal(t, float64(0), sendErrCap.sum(sendErrReasonNonTransient))
+}
- backoff := p.metrics.sendBackoffSeconds.(*countingCounter).count
- require.Greater(t, backoff, float64(0), "backoff should be recorded
between retry attempts")
+func topicPtr(t bus.Topic) *bus.Topic {
+ return &t
}
func TestRetrySendNonTransientRecordsReason(t *testing.T) {
sendErrCap := newErrReasonCapturer()
- p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
- bp := &batchPublisher{pub: p}
+ p := &pub{ //nolint:exhaustruct
+ metrics: newPubMetricsWithErrCapture(sendErrCap),
+ nodeCache: make(map[string]nodeInfo),
+ }
+ bp := &batchPublisher{
+ pub: p,
+ topic: topicPtr(data.TopicMeasureWrite),
+ }
ctx := context.Background()
mockStream := NewMockSendClient(ctx)
@@ -272,18 +194,23 @@ func TestRetrySendNonTransientRecordsReason(t *testing.T)
{
return status.Error(codes.InvalidArgument, "non-transient")
})
- err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1",
"t1")
+ err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1")
require.Error(t, err)
require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonNonTransient))
require.Equal(t, float64(0),
sendErrCap.sum(sendErrReasonRetryExhausted))
- require.Equal(t, float64(0),
p.metrics.sendRetryAttempts.(*countingCounter).count)
}
func TestRetrySendCanceledRecordsReason(t *testing.T) {
sendErrCap := newErrReasonCapturer()
- p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
- bp := &batchPublisher{pub: p}
+ p := &pub{ //nolint:exhaustruct
+ metrics: newPubMetricsWithErrCapture(sendErrCap),
+ nodeCache: make(map[string]nodeInfo),
+ }
+ bp := &batchPublisher{
+ pub: p,
+ topic: topicPtr(data.TopicMeasureWrite),
+ }
ctx, cancel := context.WithCancel(context.Background())
cancel()
@@ -293,7 +220,7 @@ func TestRetrySendCanceledRecordsReason(t *testing.T) {
return status.Error(codes.Unavailable, "unavailable")
})
- err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1",
"t1")
+ err := bp.retrySend(ctx, mockStream, &clusterv1.SendRequest{}, "n1")
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonCanceled))
@@ -301,15 +228,21 @@ func TestRetrySendCanceledRecordsReason(t *testing.T) {
func TestRetrySendStreamCanceledRecordsReason(t *testing.T) {
sendErrCap := newErrReasonCapturer()
- p := &pub{metrics: newPubMetricsWithErrCapture(sendErrCap)}
- bp := &batchPublisher{pub: p}
+ p := &pub{ //nolint:exhaustruct
+ metrics: newPubMetricsWithErrCapture(sendErrCap),
+ nodeCache: make(map[string]nodeInfo),
+ }
+ bp := &batchPublisher{
+ pub: p,
+ topic: topicPtr(data.TopicMeasureWrite),
+ }
streamCtx, cancel := context.WithCancel(context.Background())
cancel()
mockStream := NewMockSendClient(streamCtx)
- err := bp.retrySend(context.Background(), mockStream,
&clusterv1.SendRequest{}, "n1", "t1")
+ err := bp.retrySend(context.Background(), mockStream,
&clusterv1.SendRequest{}, "n1")
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, float64(1),
sendErrCap.sum(sendErrReasonStreamCanceled))
@@ -319,7 +252,10 @@ func TestListenBatchResponseRecordsRecvError(t *testing.T)
{
sendErrCap := newErrReasonCapturer()
pm := newPubMetricsWithErrCapture(sendErrCap)
p := newPubWithConnMgrForMetrics(t, pm)
- bp := &batchPublisher{pub: p}
+ bp := &batchPublisher{
+ pub: p,
+ topic: topicPtr(data.TopicMeasureWrite),
+ }
ctx := context.Background()
mockStream := NewMockSendClient(ctx)
@@ -328,7 +264,7 @@ func TestListenBatchResponseRecordsRecvError(t *testing.T) {
})
bc := make(chan batchEvent, 1)
- bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a",
"topic-a")
+ bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a")
require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonRecvError))
}
@@ -337,7 +273,10 @@ func
TestListenBatchResponseRecvNonFailoverStillRecordsRecvError(t *testing.T) {
sendErrCap := newErrReasonCapturer()
pm := newPubMetricsWithErrCapture(sendErrCap)
p := newPubWithConnMgrForMetrics(t, pm)
- bp := &batchPublisher{pub: p}
+ bp := &batchPublisher{
+ pub: p,
+ topic: topicPtr(data.TopicMeasureWrite),
+ }
ctx := context.Background()
mockStream := NewMockSendClient(ctx)
@@ -346,7 +285,7 @@ func
TestListenBatchResponseRecvNonFailoverStillRecordsRecvError(t *testing.T) {
})
bc := make(chan batchEvent, 1)
- bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a",
"topic-a")
+ bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a")
require.Equal(t, float64(1), sendErrCap.sum(sendErrReasonRecvError))
}
@@ -355,7 +294,10 @@ func
TestListenBatchResponseServerRejectedWithoutFailover(t *testing.T) {
sendErrCap := newErrReasonCapturer()
pm := newPubMetricsWithErrCapture(sendErrCap)
p := newPubWithConnMgrForMetrics(t, pm)
- bp := &batchPublisher{pub: p}
+ bp := &batchPublisher{
+ pub: p,
+ topic: topicPtr(data.TopicMeasureWrite),
+ }
ctx := context.Background()
mockStream := NewMockSendClient(ctx)
@@ -367,14 +309,12 @@ func
TestListenBatchResponseServerRejectedWithoutFailover(t *testing.T) {
})
bc := make(chan batchEvent, 1)
- bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a",
"topic-a")
+ bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a")
require.Equal(t, float64(1),
sendErrCap.sum(sendErrReasonServerRejected))
require.Equal(t, float64(0), sendErrCap.sum(sendErrReasonRecvError))
- // Non-failover server rejections are now surfaced to the caller via
batchEvent
- // so that send_err_total{reason="server_rejected"} stays consistent
with visible errors.
- // The event must carry the rejection but must NOT trigger a
circuit-breaker failover.
+ // Non-failover server rejections are now surfaced to the caller via
batchEvent.
select {
case evt, ok := <-bc:
require.True(t, ok, "expected a batchEvent for non-failover
server rejection")
@@ -390,7 +330,10 @@ func TestListenBatchResponseDiskFullSendsFailoverEvent(t
*testing.T) {
sendErrCap := newErrReasonCapturer()
pm := newPubMetricsWithErrCapture(sendErrCap)
p := newPubWithConnMgrForMetrics(t, pm)
- bp := &batchPublisher{pub: p}
+ bp := &batchPublisher{
+ pub: p,
+ topic: topicPtr(data.TopicMeasureWrite),
+ }
ctx := context.Background()
mockStream := NewMockSendClient(ctx)
@@ -402,7 +345,7 @@ func TestListenBatchResponseDiskFullSendsFailoverEvent(t
*testing.T) {
})
bc := make(chan batchEvent, 1)
- bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a",
"topic-a")
+ bp.listenBatchResponse(ctx, mockStream, func() {}, bc, "node-a")
require.Equal(t, float64(1),
sendErrCap.sum(sendErrReasonServerRejected))
@@ -416,55 +359,17 @@ func TestListenBatchResponseDiskFullSendsFailoverEvent(t
*testing.T) {
}
}
-func TestCloseDecrementsInflightStreams(t *testing.T) {
- streamGauge := &valGauge{}
- pm := &pubMetrics{
- sendSuccessTotal: &countingCounter{},
- sendErrTotal: newErrReasonCapturer(),
- sendBytesTotal: &countingCounter{},
- sendDurationSeconds: &noopHistogram{},
- sendRetryAttempts: &countingCounter{},
- sendRetryExhausted: &countingCounter{},
- sendBackoffSeconds: &countingCounter{},
- inflightStreams: streamGauge,
- inflightRequests: &noopGauge{},
- }
- p := newPubWithConnMgrForMetrics(t, pm)
-
- ctx := context.Background()
- mockStream := NewMockSendClient(ctx)
- doneCh := make(chan struct{})
- close(doneCh)
-
- bp := p.NewBatchPublisher(time.Second).(*batchPublisher)
- bp.streams["node-x"] = writeStream{
- client: mockStream,
- ctxDoneCh: doneCh,
- }
-
- streamGauge.Add(1, "node-x")
-
- _, closeErr := bp.Close()
- require.NoError(t, closeErr)
- require.Equal(t, float64(0), streamGauge.value(), "Close should
decrement inflight_streams once per open stream")
-}
-
-// TestPublishInflightRequestsBalancedWhenStreamPreexists checks that Publish
increments then decrements
-// inflight_requests (via defer) when reusing an existing stream, without
requiring GetClient / new stream setup.
-func TestPublishInflightRequestsBalancedWhenStreamPreexists(t *testing.T) {
- inflightReq := newInflightKeyedGauge()
- sendErrCap := newErrReasonCapturer()
- attempts := &countingCounter{}
- pm := &pubMetrics{
- sendSuccessTotal: attempts,
- sendErrTotal: sendErrCap,
- sendBytesTotal: &countingCounter{},
- sendDurationSeconds: &noopHistogram{},
- sendRetryAttempts: &countingCounter{},
- sendRetryExhausted: &countingCounter{},
- sendBackoffSeconds: &countingCounter{},
- inflightStreams: &noopGauge{},
- inflightRequests: inflightReq,
+// TestPublishRecordsStartedAndFinished verifies that a successful retrySend
increments
+// totalStarted and totalFinished with matching label counts.
+func TestPublishRecordsStartedAndFinished(t *testing.T) {
+ started := &countingCounter{}
+ finished := &countingCounter{}
+ pm := &pubMetrics{ //nolint:exhaustruct
+ totalStarted: started,
+ totalFinished: finished,
+ totalLatency: &noopHistogram{},
+ totalErr: newErrReasonCapturer(),
+ sentBytes: &countingCounter{},
}
p := newPubWithConnMgrForMetrics(t, pm)
@@ -479,7 +384,6 @@ func
TestPublishInflightRequestsBalancedWhenStreamPreexists(t *testing.T) {
const nodeName = "node-a"
topic := data.TopicMeasureWrite
- topicStr := topic.String()
bp := p.NewBatchPublisher(10 * time.Second).(*batchPublisher)
bp.streams[nodeName] = writeStream{
@@ -492,8 +396,6 @@ func
TestPublishInflightRequestsBalancedWhenStreamPreexists(t *testing.T) {
_, publishErr := bp.Publish(ctx, topic, msg)
require.NoError(t, publishErr)
- require.Equal(t, float64(1), attempts.count, "successful retrySend
should record send_success_total")
- require.Equal(t, float64(0), inflightReq.net(topicStr, nodeName),
- "inflight_requests defer must balance +1/-1 for topic/node")
- require.Equal(t, float64(0),
sendErrCap.sum(sendErrReasonRetryExhausted))
+ require.Equal(t, float64(1), started.count, "totalStarted must be 1 on
success")
+ require.Equal(t, float64(1), finished.count, "totalFinished must be 1
on success")
}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 4fab5b8b4..28acdb747 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -34,7 +34,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/apache/skywalking-banyandb/api/common"
- "github.com/apache/skywalking-banyandb/api/data"
+ apidata "github.com/apache/skywalking-banyandb/api/data"
apiversion "github.com/apache/skywalking-banyandb/api/proto/banyandb"
clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -79,40 +79,42 @@ type pub struct {
connMgr *grpchelper.ConnManager[*client]
closer *run.Closer
writableProbe map[string]map[string]struct{}
+ nodeCache map[string]nodeInfo
caCertPath string
caCertReloader *pkgtls.Reloader
prefix string
retryPolicy string
+ selfNode string
+ selfRole string
+ selfTier string
allowedRoles []databasev1.Role
writableProbeMu sync.Mutex
+ nodeCacheMu sync.RWMutex
tlsEnabled bool
}
-type pubMetrics struct {
- sendSuccessTotal meter.Counter
- sendErrTotal meter.Counter
- sendBytesTotal meter.Counter
- sendDurationSeconds meter.Histogram
+// nodeInfo caches the resolved role and tier for a remote node.
+type nodeInfo struct {
+ role string
+ tier string
+}
- sendRetryAttempts meter.Counter
- sendRetryExhausted meter.Counter
- sendBackoffSeconds meter.Counter
- inflightStreams meter.Gauge
- inflightRequests meter.Gauge
+type pubMetrics struct {
+ totalStarted meter.Counter
+ totalFinished meter.Counter
+ totalLatency meter.Histogram
+ totalErr meter.Counter
+ sentBytes meter.Counter
}
func newPubMetrics(factory observability.Factory) *pubMetrics {
+ labels := []string{"operation", "group", "remote_node", "remote_role",
"remote_tier"}
return &pubMetrics{
- sendSuccessTotal: factory.NewCounter("send_success_total",
"topic", "node"),
- sendErrTotal: factory.NewCounter("send_err_total",
"topic", "node", "reason"),
- sendBytesTotal: factory.NewCounter("send_bytes_total",
"topic", "node"),
- sendDurationSeconds:
factory.NewHistogram("send_duration_seconds", meter.DefBuckets, "topic",
"node", "result"),
-
- sendRetryAttempts:
factory.NewCounter("send_retry_attempts_total", "topic", "node"),
- sendRetryExhausted:
factory.NewCounter("send_retry_exhausted_total", "topic", "node"),
- sendBackoffSeconds:
factory.NewCounter("send_backoff_seconds_total", "topic", "node"),
- inflightStreams: factory.NewGauge("inflight_streams",
"node"),
- inflightRequests: factory.NewGauge("inflight_requests",
"topic", "node"),
+ totalStarted: factory.NewCounter("total_started", labels...),
+ totalFinished: factory.NewCounter("total_finished", labels...),
+ totalLatency: factory.NewHistogram("total_latency",
meter.DefBuckets, labels...),
+ totalErr: factory.NewCounter("total_err", append(labels,
"error_type")...),
+ sentBytes: factory.NewCounter("sent_bytes", labels...),
}
}
@@ -143,10 +145,16 @@ func (p *pub) NewClient(conn *grpc.ClientConn, node
*databasev1.Node) (*client,
}
// OnActive implements grpchelper.ConnectionHandler.
-func (p *pub) OnActive(_ string, c *client) {
+func (p *pub) OnActive(name string, c *client) {
for _, h := range p.handlers {
h.OnAddOrUpdate(c.md)
}
+ if node, ok := c.md.Spec.(*databasev1.Node); ok {
+ info := resolveNodeInfo(node)
+ p.nodeCacheMu.Lock()
+ p.nodeCache[name] = info
+ p.nodeCacheMu.Unlock()
+ }
}
// OnInactive implements grpchelper.ConnectionHandler.
@@ -157,6 +165,49 @@ func (p *pub) OnInactive(name string, c *client) {
p.writableProbeMu.Lock()
delete(p.writableProbe, name)
p.writableProbeMu.Unlock()
+ p.nodeCacheMu.Lock()
+ delete(p.nodeCache, name)
+ p.nodeCacheMu.Unlock()
+}
+
+// resolveNodeInfo extracts role and tier from a Node spec.
+func resolveNodeInfo(node *databasev1.Node) nodeInfo {
+ info := nodeInfo{}
+ for _, r := range node.GetRoles() {
+ switch r {
+ case databasev1.Role_ROLE_DATA:
+ info.role = "data"
+ case databasev1.Role_ROLE_LIAISON:
+ info.role = "liaison"
+ default:
+ // ROLE_UNSPECIFIED, ROLE_META and any future roles are
not queue peers.
+ }
+ if info.role != "" {
+ break
+ }
+ }
+ if node.GetLabels() != nil {
+ info.tier = node.GetLabels()["type"]
+ }
+ return info
+}
+
+// getNodeInfo returns the cached nodeInfo for the named node.
+func (p *pub) getNodeInfo(name string) nodeInfo {
+ if p == nil {
+ return nodeInfo{}
+ }
+ p.nodeCacheMu.RLock()
+ info := p.nodeCache[name]
+ p.nodeCacheMu.RUnlock()
+ return info
+}
+
+// SetSelfNode sets the publisher's own node identity for sender stamping.
+func (p *pub) SetSelfNode(name, role, tier string) {
+ p.selfNode = name
+ p.selfRole = role
+ p.selfTier = tier
}
func (p *pub) FlagSet() *run.FlagSet {
@@ -327,6 +378,12 @@ func (p *pub) publish(timeout time.Duration, topic
bus.Topic, messages ...bus.Me
if errSend != nil {
return multierr.Append(err, fmt.Errorf("failed to
marshal message[%d]: %w", m.ID(), errSend))
}
+ // Stamp sender identity so the receiver can label
query/control queue
+ // metrics by sender (topology). publish opens one stream per
message, so
+ // every request is the first frame of its stream — the sub
captures it there.
+ r.SenderNode = p.selfNode
+ r.SenderRole = p.selfRole
+ r.SenderTier = p.selfTier
node := m.Node()
execErr := p.connMgr.Execute(node, func(c *client) error {
ctx, cancel :=
context.WithTimeout(context.Background(), timeout)
@@ -390,12 +447,14 @@ func New(metadata metadata.Repo, roles
...databasev1.Role) queue.Client {
allowedRoles: roles,
prefix: strBuilder.String(),
writableProbe: make(map[string]map[string]struct{}),
+ nodeCache: make(map[string]nodeInfo),
retryPolicy: retryPolicy,
}
return p
}
// NewWithoutMetadata returns a new queue client without metadata, defaulting
to data nodes.
+// sender_* fields are left empty for this lifecycle-tool publisher.
func NewWithoutMetadata() queue.Client {
p := New(nil, databasev1.Role_ROLE_DATA)
pp := p.(*pub)
@@ -465,15 +524,16 @@ func messageToRequest(topic bus.Topic, m bus.Message)
(*clusterv1.SendRequest, e
},
}
- switch data := m.Data().(type) {
+ switch msgData := m.Data().(type) {
case proto.Message:
- messageData, err := proto.Marshal(data)
+ messageData, err := proto.Marshal(msgData)
if err != nil {
return nil, fmt.Errorf("failed to marshal message %T:
%w", m, err)
}
r.Body = messageData
+ r.Group = apidata.GroupFromMessageData(topic, msgData)
case []byte:
- r.Body = data
+ r.Body = msgData
default:
return nil, fmt.Errorf("invalid message type %T", m.Data())
}
@@ -519,7 +579,7 @@ func (l *future) Get() (bus.Message, error) {
if resp.Body == nil {
return bus.NewMessageWithNode(bus.MessageID(resp.MessageId), n,
nil), nil
}
- if codec, ok := data.TopicResponseMap[t]; ok {
+ if codec, ok := apidata.TopicResponseMap[t]; ok {
m, decodeErr := codec.Unmarshal(resp.Body)
if decodeErr != nil {
return bus.Message{}, decodeErr
@@ -594,13 +654,25 @@ func (p *pub) NewChunkedSyncClientWithConfig(node string,
config *ChunkedSyncCli
if config.ChunkSize == 0 {
config.ChunkSize = defaultChunkSize
}
+ // Resolve the target node's role/tier so file-sync metrics carry the
remote
+ // service-level topology labels (the connMgr client already holds the
Node).
+ var info nodeInfo
+ if n, ok := c.md.Spec.(*databasev1.Node); ok {
+ info = resolveNodeInfo(n)
+ }
return &chunkedSyncClient{
- client: c.client,
- conn: c.conn,
- node: node,
- log: p.log,
- chunkSize: config.ChunkSize,
- config: config,
+ client: c.client,
+ conn: c.conn,
+ node: node,
+ log: p.log,
+ metrics: p.metrics,
+ selfNode: p.selfNode,
+ selfRole: p.selfRole,
+ selfTier: p.selfTier,
+ remoteRole: info.role,
+ remoteTier: info.tier,
+ chunkSize: config.ChunkSize,
+ config: config,
}, nil
}
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
index 680f3eee6..5568cf7d4 100644
--- a/banyand/queue/pub/pub_test.go
+++ b/banyand/queue/pub/pub_test.go
@@ -171,6 +171,11 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
p.OnAddOrUpdate(node1)
node2 := getDataNode("node2", addr2)
p.OnAddOrUpdate(node2)
+ // Wait for both nodes to finish connecting before
publishing; otherwise the
+ // first Publish can race the async connection setup
and fail to get a client.
+ gomega.Eventually(func() int {
+ return p.connMgr.ActiveCount()
+ }, flags.EventuallyTimeout).Should(gomega.Equal(2))
bp := p.NewBatchPublisher(15 * time.Second)
ctx := context.TODO()
diff --git a/banyand/queue/pub/retry_test.go b/banyand/queue/pub/retry_test.go
index 8293f90ee..b073c3470 100644
--- a/banyand/queue/pub/retry_test.go
+++ b/banyand/queue/pub/retry_test.go
@@ -109,7 +109,7 @@ func TestRetrySendSuccess(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+ err := bp.retrySend(ctx, mockStream, req, "test-node")
assert.NoError(t, err, "successful send should not return error")
}
@@ -131,7 +131,7 @@ func TestRetrySendTransientErrorWithRecovery(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+ err := bp.retrySend(ctx, mockStream, req, "test-node")
assert.NoError(t, err, "should succeed after retry")
}
@@ -149,7 +149,7 @@ func TestRetrySendNonTransientError(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+ err := bp.retrySend(ctx, mockStream, req, "test-node")
assert.Error(t, err, "non-transient error should be returned
immediately")
assert.Equal(t, nonTransientErr, err, "should return the original
error")
@@ -168,7 +168,7 @@ func TestRetrySendExhaustedRetries(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+ err := bp.retrySend(ctx, mockStream, req, "test-node")
assert.Error(t, err, "should return error after exhausting retries")
assert.Contains(t, err.Error(), "retry exhausted", "error should
indicate retry exhaustion")
@@ -190,7 +190,7 @@ func TestRetrySendContextCancellation(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+ err := bp.retrySend(ctx, mockStream, req, "test-node")
assert.Error(t, err, "should return error when context is canceled")
assert.Equal(t, context.Canceled, err, "should return context
cancellation error")
@@ -208,7 +208,7 @@ func TestRetrySendStreamContextDone(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+ err := bp.retrySend(ctx, mockStream, req, "test-node")
assert.Error(t, err, "should return error when stream context is done")
assert.Equal(t, context.Canceled, err, "should return stream context
cancellation error")
@@ -228,7 +228,7 @@ func TestRetrySendPerAttemptTimeout(t *testing.T) {
req := &clusterv1.SendRequest{}
start := time.Now()
- _ = bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+ _ = bp.retrySend(ctx, mockStream, req, "test-node")
duration := time.Since(start)
// Should timeout quickly due to per-attempt timeout, not wait for the
full operation
@@ -259,7 +259,7 @@ func TestRetrySendBackoffTiming(t *testing.T) {
req := &clusterv1.SendRequest{}
start := time.Now()
- err := bp.retrySend(ctx, mockStream, req, "test-node", "test-topic")
+ err := bp.retrySend(ctx, mockStream, req, "test-node")
duration := time.Since(start)
assert.NoError(t, err, "should eventually succeed")
@@ -302,7 +302,7 @@ func TestRetrySendConcurrency(t *testing.T) {
bp := &batchPublisher{}
req := &clusterv1.SendRequest{}
- err := bp.retrySend(ctx, mockStream, req,
fmt.Sprintf("test-node-%d", id), "test-topic")
+ err := bp.retrySend(ctx, mockStream, req,
fmt.Sprintf("test-node-%d", id))
errors <- err
}(i)
}
diff --git a/banyand/queue/sub/chunked_sync.go
b/banyand/queue/sub/chunked_sync.go
index 4deb88e59..1ed1be1fa 100644
--- a/banyand/queue/sub/chunked_sync.go
+++ b/banyand/queue/sub/chunked_sync.go
@@ -103,17 +103,16 @@ type chunkBuffer struct {
}
type syncSession struct {
- startTime time.Time
- metadata *clusterv1.SyncMetadata
- partsProgress map[int]*partProgress
- partCtx *queue.ChunkedSyncPartContext
- chunkBuffer *chunkBuffer
- sessionID string
- errorMsg string
- totalReceived uint64
- chunksReceived uint32
- abortedRecorded bool
- completed bool
+ startTime time.Time
+ metadata *clusterv1.SyncMetadata
+ partsProgress map[int]*partProgress
+ partCtx *queue.ChunkedSyncPartContext
+ chunkBuffer *chunkBuffer
+ sessionID string
+ errorMsg string
+ totalReceived uint64
+ chunksReceived uint32
+ completed bool
}
type partProgress struct {
@@ -122,54 +121,15 @@ type partProgress struct {
completed bool
}
-const (
- abortedReasonSwitch = "switch"
- abortedReasonStreamError = "stream_error"
- abortedReasonCtxDone = "ctx_done"
- abortedReasonEOF = "eof"
-)
-
-// releaseMetrics releases the gauges held by this session.
-// Idempotent: safe to call from both handleCompletion and the SyncPart defer.
-func (s *syncSession) releaseMetrics(m *metrics) {
- if m == nil || s.completed {
- return
- }
- m.activeSyncSessions.Add(-1, s.metadata.Topic)
- if s.chunkBuffer != nil {
- if n := len(s.chunkBuffer.chunks); n > 0 {
- m.reorderBuffered.Add(-float64(n), s.metadata.Topic)
- }
- }
- s.completed = true
-}
-
-func (s *syncSession) recordAborted(m *metrics, reason string) {
- if m == nil || s.abortedRecorded || s.completed {
- return
- }
- if reason == "" {
- return
- }
- // chunkedSyncAbortedTotal is always initialized in production; tests
that wire partial metrics must set it if they trigger abort paths.
- m.chunkedSyncAbortedTotal.Inc(1, s.metadata.Topic, reason)
- s.abortedRecorded = true
-}
-
// SyncPart implements clusterv1.ChunkedSyncServiceServer.
func (s *server) SyncPart(stream clusterv1.ChunkedSyncService_SyncPartServer)
error {
ctx := stream.Context()
var currentSession *syncSession
var sessionID string
- var abortReason string
defer func() {
if currentSession == nil {
return
}
- if !currentSession.completed {
- currentSession.recordAborted(s.metrics, abortReason)
- }
- currentSession.releaseMetrics(s.metrics)
if currentSession.partCtx != nil {
if closeErr := currentSession.partCtx.Close(); closeErr
!= nil {
s.log.Error().Err(closeErr).Str("session_id",
currentSession.sessionID).Msg("failed to close session partCtx")
@@ -180,20 +140,29 @@ func (s *server) SyncPart(stream
clusterv1.ChunkedSyncService_SyncPartServer) er
for {
select {
case <-ctx.Done():
- abortReason = abortedReasonCtxDone
+ if s.metrics != nil && currentSession != nil &&
!currentSession.completed {
+ op, grp, sn, sr, st :=
s.resolveSessionLabels(currentSession)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"ctx_done")
+ }
return ctx.Err()
default:
}
- req, err := stream.Recv()
- if errors.Is(err, io.EOF) {
- abortReason = abortedReasonEOF
+ req, recvErr := stream.Recv()
+ if errors.Is(recvErr, io.EOF) {
+ if s.metrics != nil && currentSession != nil &&
!currentSession.completed {
+ op, grp, sn, sr, st :=
s.resolveSessionLabels(currentSession)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"eof")
+ }
break
}
- if err != nil {
- abortReason = abortedReasonStreamError
- s.log.Error().Err(err).Msg("failed to receive chunk")
- return err
+ if recvErr != nil {
+ if s.metrics != nil && currentSession != nil &&
!currentSession.completed {
+ op, grp, sn, sr, st :=
s.resolveSessionLabels(currentSession)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"stream_error")
+ }
+ s.log.Error().Err(recvErr).Msg("failed to receive
chunk")
+ return recvErr
}
sessionID = req.SessionId
@@ -212,15 +181,27 @@ func (s *server) SyncPart(stream
clusterv1.ChunkedSyncService_SyncPartServer) er
return s.handleCompletion(stream, currentSession, req)
}
- if err := s.processChunk(stream, currentSession, req); err !=
nil {
- s.log.Error().Err(err).Str("session_id",
sessionID).Msg("failed to process chunk")
- return err
+ if processErr := s.processChunk(stream, currentSession, req);
processErr != nil {
+ s.log.Error().Err(processErr).Str("session_id",
sessionID).Msg("failed to process chunk")
+ return processErr
}
}
return nil
}
+// resolveSessionLabels returns the label tuple for the current session.
+func (s *server) resolveSessionLabels(session *syncSession) (operation, group,
senderNode, senderRole, senderTier string) {
+ operation = data.OperationFileSyncValue
+ if session.metadata != nil {
+ group = session.metadata.GetGroup()
+ senderNode = session.metadata.GetSenderNode()
+ senderRole = session.metadata.GetSenderRole()
+ senderTier = session.metadata.GetSenderTier()
+ }
+ return
+}
+
func (s *server) startOrSwitchSession(sessionID string, req
*clusterv1.SyncPartRequest, previousSession *syncSession) *syncSession {
if previousSession != nil {
s.cleanupPreviousSession(previousSession)
@@ -234,7 +215,8 @@ func (s *server) startOrSwitchSession(sessionID string, req
*clusterv1.SyncPartR
partsProgress: make(map[int]*partProgress),
}
if s.metrics != nil {
- s.metrics.activeSyncSessions.Add(1, newSession.metadata.Topic)
+ op, grp, sn, sr, st := s.resolveSessionLabels(newSession)
+ s.metrics.totalStarted.Inc(1, op, grp, sn, sr, st)
}
if dl := s.log.Debug(); dl.Enabled() {
dl.Str("session_id", sessionID).
@@ -247,16 +229,22 @@ func (s *server) startOrSwitchSession(sessionID string,
req *clusterv1.SyncPartR
func (s *server) cleanupPreviousSession(previousSession *syncSession) {
if !previousSession.completed {
- previousSession.recordAborted(s.metrics, abortedReasonSwitch)
+ if s.metrics != nil {
+ op, grp, sn, sr, st :=
s.resolveSessionLabels(previousSession)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st, "switch")
+ }
+ previousSession.completed = true
}
- previousSession.releaseMetrics(s.metrics)
if previousSession.partCtx == nil {
return
}
if previousSession.partCtx.Handler != nil {
if finishErr := previousSession.partCtx.Handler.FinishSync();
finishErr != nil {
- s.updateChunkOrderMetrics("finish_sync_err",
previousSession.metadata.Topic)
+ if s.metrics != nil {
+ op, grp, sn, sr, st :=
s.resolveSessionLabels(previousSession)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"finish_sync_err")
+ }
s.log.Error().Err(finishErr).Str("session_id",
previousSession.sessionID).Msg("failed to finish sync for previous session")
}
}
@@ -268,8 +256,8 @@ func (s *server) cleanupPreviousSession(previousSession
*syncSession) {
func (s *server) processChunk(stream
clusterv1.ChunkedSyncService_SyncPartServer, session *syncSession, req
*clusterv1.SyncPartRequest) error {
// Check version compatibility on every chunk
if req.VersionInfo != nil {
- versionCompatibility, status :=
checkSyncVersionCompatibility(req.VersionInfo)
- if status != clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED {
+ versionCompatibility, syncStatus :=
checkSyncVersionCompatibility(req.VersionInfo)
+ if syncStatus !=
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED {
s.log.Warn().
Str("session_id", req.SessionId).
Str("client_api_version",
req.VersionInfo.ApiVersion).
@@ -277,7 +265,7 @@ func (s *server) processChunk(stream
clusterv1.ChunkedSyncService_SyncPartServer
Str("reason", versionCompatibility.Reason).
Msg("sync version compatibility check failed")
- return s.sendResponse(stream, req, status,
versionCompatibility.Reason, versionCompatibility)
+ return s.sendResponse(stream, req, syncStatus,
versionCompatibility.Reason, versionCompatibility)
}
}
@@ -315,14 +303,14 @@ func (s *server) processChunkSequential(stream
clusterv1.ChunkedSyncService_Sync
func (s *server) processChunkWithReordering(stream
clusterv1.ChunkedSyncService_SyncPartServer, session *syncSession, req
*clusterv1.SyncPartRequest) error {
buffer := session.chunkBuffer
// must check buffer timeout before refreshing lastActivity, otherwise
it will never timeout.
- if err := s.checkBufferTimeout(session); err != nil {
- return err
+ if checkErr := s.checkBufferTimeout(session); checkErr != nil {
+ return checkErr
}
buffer.lastActivity = time.Now()
if req.ChunkIndex == buffer.expectedIndex {
- if err := s.processExpectedChunk(stream, session, req); err !=
nil {
- return err
+ if processErr := s.processExpectedChunk(stream, session, req);
processErr != nil {
+ return processErr
}
buffer.expectedIndex++
@@ -331,7 +319,6 @@ func (s *server) processChunkWithReordering(stream
clusterv1.ChunkedSyncService_
if req.ChunkIndex > buffer.expectedIndex {
gap := req.ChunkIndex - buffer.expectedIndex
- s.updateChunkOrderMetrics("out_of_order_received",
session.metadata.Topic)
if gap > s.maxChunkGapSize {
errMsg := fmt.Sprintf("chunk gap too large: expected
%d, got %d (gap: %d > max: %d)",
@@ -342,7 +329,10 @@ func (s *server) processChunkWithReordering(stream
clusterv1.ChunkedSyncService_
Uint32("gap", gap).
Uint32("max_gap", s.maxChunkGapSize).
Msg("chunk gap too large, rejecting")
- s.updateChunkOrderMetrics("gap_too_large",
session.metadata.Topic)
+ if s.metrics != nil {
+ op, grp, sn, sr, st :=
s.resolveSessionLabels(session)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"gap_too_large")
+ }
return s.sendResponse(stream, req,
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_OUT_OF_ORDER, errMsg, nil)
}
@@ -353,7 +343,10 @@ func (s *server) processChunkWithReordering(stream
clusterv1.ChunkedSyncService_
Uint32("buffer_size",
uint32(len(buffer.chunks))).
Uint32("max_buffer_size", buffer.maxBufferSize).
Msg("chunk buffer full, rejecting chunk")
- s.updateChunkOrderMetrics("buffer_full",
session.metadata.Topic)
+ if s.metrics != nil {
+ op, grp, sn, sr, st :=
s.resolveSessionLabels(session)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"buffer_full")
+ }
return s.sendResponse(stream, req,
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_OUT_OF_ORDER, errMsg, nil)
}
@@ -366,10 +359,6 @@ func (s *server) processChunkWithReordering(stream
clusterv1.ChunkedSyncService_
Uint32("buffered_chunks",
uint32(len(buffer.chunks))).
Msg("buffered out-of-order chunk")
}
- s.updateChunkOrderMetrics("chunk_buffered",
session.metadata.Topic)
- if s.metrics != nil {
- s.metrics.reorderBuffered.Add(1, session.metadata.Topic)
- }
return s.sendResponse(stream, req,
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED,
fmt.Sprintf("chunk %d buffered (waiting for %d)",
req.ChunkIndex, buffer.expectedIndex), nil)
}
@@ -395,6 +384,10 @@ func (s *server) processExpectedChunk(stream
clusterv1.ChunkedSyncService_SyncPa
errMsg := fmt.Sprintf("chunk %d checksum mismatch: expected %s,
got %s",
req.ChunkIndex, req.ChunkChecksum, calculatedChecksum)
s.log.Warn().Str("session_id", req.SessionId).Msg(errMsg)
+ if s.metrics != nil {
+ op, grp, sn, sr, st := s.resolveSessionLabels(session)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"checksum_mismatch")
+ }
return s.sendResponse(stream, req,
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_CHECKSUM_MISMATCH, errMsg, nil)
}
@@ -418,8 +411,8 @@ func (s *server) processExpectedChunk(stream
clusterv1.ChunkedSyncService_SyncPa
session.partCtx.ID != partInfo.Id
if createNewContext && session.partCtx != nil &&
session.partCtx.Handler != nil {
- if err := session.partCtx.Handler.FinishSync(); err !=
nil {
- return fmt.Errorf("failed to complete part %d:
%w", session.partCtx.ID, err)
+ if finishErr := session.partCtx.Handler.FinishSync();
finishErr != nil {
+ return fmt.Errorf("failed to complete part %d:
%w", session.partCtx.ID, finishErr)
}
}
@@ -438,9 +431,9 @@ func (s *server) processExpectedChunk(stream
clusterv1.ChunkedSyncService_SyncPa
MaxKey: partInfo.MaxKey,
PartType: partInfo.PartType,
}
- partHandler, err :=
handler.CreatePartHandler(session.partCtx)
- if err != nil {
- return fmt.Errorf("failed to create part
handler: %w", err)
+ partHandler, createErr :=
handler.CreatePartHandler(session.partCtx)
+ if createErr != nil {
+ return fmt.Errorf("failed to create part
handler: %w", createErr)
}
session.partCtx.Handler = partHandler
} else if session.partCtx.PartType != partInfo.PartType {
@@ -453,18 +446,22 @@ func (s *server) processExpectedChunk(stream
clusterv1.ChunkedSyncService_SyncPa
session.partCtx.MinKey = partInfo.MinKey
session.partCtx.MaxKey = partInfo.MaxKey
session.partCtx.PartType = partInfo.PartType
- if err :=
session.partCtx.Handler.NewPartType(session.partCtx); err != nil {
- return fmt.Errorf("failed to new part type:
%w", err)
+ if newPartErr :=
session.partCtx.Handler.NewPartType(session.partCtx); newPartErr != nil {
+ return fmt.Errorf("failed to new part type:
%w", newPartErr)
}
}
- if err := s.processPart(session, req, partInfo, partIndex,
handler); err != nil {
- s.log.Error().Err(err).
+ if processErr := s.processPart(session, req, partInfo,
partIndex, handler); processErr != nil {
+ s.log.Error().Err(processErr).
Str("session_id", req.SessionId).
Str("topic", session.metadata.Topic).
Int("part_index", partIndex).
Msg("failed to process part")
- return err
+ if s.metrics != nil {
+ op, grp, sn, sr, st :=
s.resolveSessionLabels(session)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"part_failed")
+ }
+ return processErr
}
}
@@ -477,9 +474,6 @@ func (s *server) processBufferedChunks(stream
clusterv1.ChunkedSyncService_SyncP
for {
if chunk, exists := buffer.chunks[buffer.expectedIndex]; exists
{
delete(buffer.chunks, buffer.expectedIndex)
- if s.metrics != nil {
- s.metrics.reorderBuffered.Add(-1,
session.metadata.Topic)
- }
if dl := s.log.Debug(); dl.Enabled() {
dl.Str("session_id", session.sessionID).
@@ -489,8 +483,8 @@ func (s *server) processBufferedChunks(stream
clusterv1.ChunkedSyncService_SyncP
Msg("processing buffered chunk")
}
- if err := s.processExpectedChunk(stream, session,
chunk); err != nil {
- return err
+ if processErr := s.processExpectedChunk(stream,
session, chunk); processErr != nil {
+ return processErr
}
buffer.expectedIndex++
} else {
@@ -514,7 +508,10 @@ func (s *server) checkBufferTimeout(session *syncSession)
error {
missing = append(missing, i)
}
}
- s.updateChunkOrderMetrics("buffer_timeout",
session.metadata.Topic)
+ if s.metrics != nil {
+ op, grp, sn, sr, st :=
s.resolveSessionLabels(session)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"buffer_timeout")
+ }
return fmt.Errorf("buffer timeout: missing chunks %v
after %v",
missing, session.chunkBuffer.bufferTimeout)
}
@@ -565,8 +562,8 @@ func (s *server) processPart(session *syncSession, req
*clusterv1.SyncPartReques
session.partCtx.FileName = fileInfo.Name
session.partCtx.PartType = partInfo.PartType
- if err := handler.HandleFileChunk(session.partCtx, fileChunk);
err != nil {
- return fmt.Errorf("failed to stream file chunk for %s:
%w", fileInfo.Name, err)
+ if handleErr := handler.HandleFileChunk(session.partCtx,
fileChunk); handleErr != nil {
+ return fmt.Errorf("failed to stream file chunk for %s:
%w", fileInfo.Name, handleErr)
}
}
@@ -581,9 +578,13 @@ func (s *server) processPart(session *syncSession, req
*clusterv1.SyncPartReques
}
func (s *server) handleCompletion(stream
clusterv1.ChunkedSyncService_SyncPartServer, session *syncSession, req
*clusterv1.SyncPartRequest) error {
- if session.partCtx.Handler != nil {
- if err := session.partCtx.Handler.FinishSync(); err != nil {
- return fmt.Errorf("failed to complete part %d: %w",
session.partCtx.ID, err)
+ if session.partCtx != nil && session.partCtx.Handler != nil {
+ if finishErr := session.partCtx.Handler.FinishSync(); finishErr
!= nil {
+ if s.metrics != nil {
+ op, grp, sn, sr, st :=
s.resolveSessionLabels(session)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"finish_sync_err")
+ }
+ return fmt.Errorf("failed to complete part %d: %w",
session.partCtx.ID, finishErr)
}
session.partCtx.Handler = nil
}
@@ -613,15 +614,15 @@ func (s *server) handleCompletion(stream
clusterv1.ChunkedSyncService_SyncPartSe
PartsResults: partsResults,
}
- session.releaseMetrics(s.metrics)
+ session.completed = true
if s.metrics != nil {
- topic := session.metadata.Topic
-
s.metrics.chunkedSyncTotalBytes.Inc(float64(syncResult.TotalBytesReceived),
topic)
-
s.metrics.chunkedSyncDurationSecs.Observe(float64(syncResult.DurationMs)/1000.0,
topic)
-
+ op, grp, sn, sr, st := s.resolveSessionLabels(session)
+ s.metrics.totalFinished.Inc(1, op, grp, sn, sr, st)
+
s.metrics.totalLatency.Observe(float64(syncResult.DurationMs)/1000.0, op, grp,
sn, sr, st)
+
s.metrics.receivedBytes.Inc(float64(syncResult.TotalBytesReceived), op, grp,
sn, sr, st)
for _, pr := range partsResults {
if !pr.Success {
- s.metrics.chunkedSyncFailedParts.Inc(1, topic)
+ s.metrics.totalErr.Inc(1, op, grp, sn, sr, st,
"part_failed")
}
}
}
@@ -641,14 +642,14 @@ func (s *server) handleCompletion(stream
clusterv1.ChunkedSyncService_SyncPartSe
func (s *server) sendResponse(
stream clusterv1.ChunkedSyncService_SyncPartServer,
req *clusterv1.SyncPartRequest,
- status clusterv1.SyncStatus,
+ syncStatus clusterv1.SyncStatus,
errorMsg string,
result interface{},
) error {
resp := &clusterv1.SyncPartResponse{
SessionId: req.SessionId,
ChunkIndex: req.ChunkIndex,
- Status: status,
+ Status: syncStatus,
Error: errorMsg,
}
diff --git a/banyand/queue/sub/chunked_sync_test.go
b/banyand/queue/sub/chunked_sync_test.go
index 2ba9889ff..206f93b8c 100644
--- a/banyand/queue/sub/chunked_sync_test.go
+++ b/banyand/queue/sub/chunked_sync_test.go
@@ -211,102 +211,75 @@ func (c *capturingCounter) Delete(_ ...string) bool {
return true
}
-func (c *capturingCounter) uniqueFirstLabelValues() map[string]struct{} {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- m := make(map[string]struct{})
- for _, lv := range c.labelValues {
- if len(lv) > 0 {
- m[lv[0]] = struct{}{}
- }
- }
- return m
-}
-
-func TestChunkOrderingMetricsAreLabeledByTopic_NotSessionID(t *testing.T) {
- // enable reordering, otherwise the chunk-ordering metrics will not be
triggered.
- s := &server{
- log:
logger.GetLogger("test-server-metrics-label"),
+// TestChunkOrderingBufferFull verifies that buffer_full error_type is emitted
when the
+// chunk buffer is full. The new metrics use
(operation,group,remote_node,remote_role,remote_tier,error_type).
+func TestChunkOrderingBufferFull(t *testing.T) {
+ errCap := &capturingCounter{}
+ s := &server{ //nolint:exhaustruct
+ log:
logger.GetLogger("test-server-buffer-full"),
chunkedSyncHandlers:
make(map[bus.Topic]queue.ChunkedSyncHandler),
enableChunkReordering: true,
- maxChunkBufferSize: 10,
- maxChunkGapSize: 5,
+ maxChunkBufferSize: 1,
+ maxChunkGapSize: 10,
+ chunkBufferTimeout: time.Hour,
+ metrics: newTestMetrics(),
}
+ s.metrics.totalErr = errCap
- // handler: avoid "no handler registered" in processExpectedChunk.
mockHandler := &MockChunkedSyncHandler{}
s.chunkedSyncHandlers[data.TopicStreamPartSync] = mockHandler
- outOfOrder := &capturingCounter{}
- buffered := &capturingCounter{}
- testMetrics := newTestMetrics()
- testMetrics.outOfOrderChunksReceived = outOfOrder
- testMetrics.chunksBuffered = buffered
- s.metrics = testMetrics
-
- topic := data.TopicStreamPartSync.String()
-
- drive := func(sessionID string) {
- mockStream := &MockSyncPartStream{}
- session := &syncSession{
- sessionID: sessionID,
- startTime: time.Now(),
- chunksReceived: 0,
- partsProgress: make(map[int]*partProgress),
- metadata: &clusterv1.SyncMetadata{
- Group: "test-group",
- Topic: topic,
- },
- }
-
- // send chunk 0 (establish buffer.expectedIndex=1)
- req0 := &clusterv1.SyncPartRequest{
- SessionId: sessionID,
- ChunkIndex: 0,
- ChunkData: []byte("chunk-0"),
- ChunkChecksum: fmt.Sprintf("%x",
crc32.ChecksumIEEE([]byte("chunk-0"))),
- PartsInfo: []*clusterv1.PartInfo{
- {Id: 1, Files: []*clusterv1.FileInfo{{Name:
"f", Offset: 0, Size: 7}}},
- },
- }
- require.NoError(t, s.processChunk(mockStream, session, req0))
+ mockStream := &MockSyncPartStream{}
+ session := &syncSession{ //nolint:exhaustruct
+ sessionID: "buf-full-sess",
+ startTime: time.Now(),
+ chunksReceived: 0,
+ partsProgress: make(map[int]*partProgress),
+ metadata: &clusterv1.SyncMetadata{
+ Group: "test-group",
+ Topic: data.TopicStreamPartSync.String(),
+ },
+ }
- // send chunk 2 (out-of-order: expected 1 got 2),
- // will trigger out_of_order_received + chunk_buffered.
- req2 := &clusterv1.SyncPartRequest{
- SessionId: sessionID,
- ChunkIndex: 2,
- ChunkData: []byte("chunk-2"),
- ChunkChecksum: fmt.Sprintf("%x",
crc32.ChecksumIEEE([]byte("chunk-2"))),
- PartsInfo: []*clusterv1.PartInfo{
- {Id: 2, Files: []*clusterv1.FileInfo{{Name:
"f", Offset: 0, Size: 7}}},
- },
+ // chunk 0 — processed normally, buffer.expectedIndex becomes 1
+ req0 := &clusterv1.SyncPartRequest{
+ SessionId: "buf-full-sess",
+ ChunkIndex: 0,
+ ChunkData: []byte("chunk-0"),
+ ChunkChecksum: fmt.Sprintf("%x",
crc32.ChecksumIEEE([]byte("chunk-0"))),
+ PartsInfo: []*clusterv1.PartInfo{{Id: 1, Files:
[]*clusterv1.FileInfo{{Name: "f", Offset: 0, Size: 7}}}},
+ }
+ require.NoError(t, s.processChunk(mockStream, session, req0))
+
+ // chunk 2 — out-of-order, fills the buffer (maxBufferSize=1)
+ req2 := &clusterv1.SyncPartRequest{
+ SessionId: "buf-full-sess",
+ ChunkIndex: 2,
+ ChunkData: []byte("chunk-2"),
+ ChunkChecksum: fmt.Sprintf("%x",
crc32.ChecksumIEEE([]byte("chunk-2"))),
+ PartsInfo: []*clusterv1.PartInfo{{Id: 2, Files:
[]*clusterv1.FileInfo{{Name: "f", Offset: 0, Size: 7}}}},
+ }
+ require.NoError(t, s.processChunk(mockStream, session, req2))
+
+ // chunk 3 — buffer is full, should trigger buffer_full error_type
+ req3 := &clusterv1.SyncPartRequest{
+ SessionId: "buf-full-sess",
+ ChunkIndex: 3,
+ ChunkData: []byte("chunk-3"),
+ ChunkChecksum: fmt.Sprintf("%x",
crc32.ChecksumIEEE([]byte("chunk-3"))),
+ PartsInfo: []*clusterv1.PartInfo{{Id: 3, Files:
[]*clusterv1.FileInfo{{Name: "f", Offset: 0, Size: 7}}}},
+ }
+ require.NoError(t, s.processChunk(mockStream, session, req3))
+
+ // error_type label is the last (index 5):
operation,group,remote_node,remote_role,remote_tier,error_type
+ found := false
+ for _, labels := range errCap.labelValues {
+ if len(labels) > 0 && labels[len(labels)-1] == "buffer_full" {
+ found = true
+ break
}
- require.NoError(t, s.processChunk(mockStream, session, req2))
}
-
- drive("test-session-A")
- drive("test-session-B")
-
- // assert: labelValues[0] must be topic; and unique label must be only
one (topic)
- uniqOut := outOfOrder.uniqueFirstLabelValues()
- uniqBuf := buffered.uniqueFirstLabelValues()
-
- assert.Equal(t, 1, len(uniqOut))
- assert.Equal(t, 1, len(uniqBuf))
-
- _, okOut := uniqOut[topic]
- _, okBuf := uniqBuf[topic]
- assert.True(t, okOut, "out_of_order_received label must be topic")
- assert.True(t, okBuf, "chunk_buffered label must be topic")
-
- // assert: never should have session_id as label
- _, bad1 := uniqOut["test-session-A"]
- _, bad2 := uniqOut["test-session-B"]
- _, bad3 := uniqBuf["test-session-A"]
- _, bad4 := uniqBuf["test-session-B"]
- assert.False(t, bad1 || bad2 || bad3 || bad4, "metrics must not be
labeled by session_id")
+ assert.True(t, found, "expected error_type=buffer_full to be recorded")
}
// MockChunkedSyncHandler implements queue.ChunkedSyncHandler for testing.
diff --git a/banyand/queue/sub/helpers.go b/banyand/queue/sub/helpers.go
index ca08bf530..b8b77e93d 100644
--- a/banyand/queue/sub/helpers.go
+++ b/banyand/queue/sub/helpers.go
@@ -29,13 +29,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
)
-func (s *server) handleEOF(stream clusterv1.Service_SendServer, topic
*bus.Topic, dataCollection []any, writeEntity *clusterv1.SendRequest) {
+func (s *server) handleEOF(stream clusterv1.Service_SendServer, topic
*bus.Topic, dataCollection []any, writeEntity *clusterv1.SendRequest, identity
*streamIdentity) {
if len(dataCollection) < 1 {
return
}
listeners := s.getListeners(*topic)
if len(listeners) == 0 {
- s.reply(stream, writeEntity, nil, "no listener found")
+ s.replyWithErrType(stream, writeEntity, nil, "no listener
found", identity, "no_listener")
return
}
if len(listeners) > 1 {
@@ -43,30 +43,24 @@ func (s *server) handleEOF(stream
clusterv1.Service_SendServer, topic *bus.Topic
}
listener := listeners[0]
if le := listener.CheckHealth(); le != nil {
- s.reply(stream, writeEntity, le, "")
+ s.replyWithErrType(stream, writeEntity, le, "", identity,
"handler_error")
return
}
message := listener.Rev(stream.Context(),
bus.NewMessage(bus.MessageID(0), dataCollection))
- var resp *clusterv1.SendResponse
- data := message.Data()
- if data != nil {
- switch d := data.(type) {
- case *common.Error:
- resp = &clusterv1.SendResponse{
- MessageId: writeEntity.MessageId,
- Error: d.Error(),
- Status: d.Status(),
- }
- default:
- resp = &clusterv1.SendResponse{
- MessageId: writeEntity.MessageId,
- }
- }
+ // writeEntity is nil when the stream closed (Recv returned io.EOF), so
guard the ID access.
+ var msgID uint64
+ if writeEntity != nil {
+ msgID = writeEntity.MessageId
+ }
+ resp := &clusterv1.SendResponse{MessageId: msgID}
+ if ce, ok := message.Data().(*common.Error); ok {
+ resp.Error = ce.Error()
+ resp.Status = ce.Status()
}
if errSend := stream.Send(resp); errSend != nil {
s.log.Error().Stringer("written",
writeEntity).Err(errSend).Msg("failed to send write response")
- if writeEntity != nil {
- s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
+ if s.metrics != nil && writeEntity != nil {
+ s.metrics.totalErr.Inc(1, identity.operation,
identity.group, identity.senderNode, identity.senderRole, identity.senderTier,
"send_error")
}
}
}
@@ -79,11 +73,12 @@ func (s *server) handleRecvError(err error) error {
return err
}
-func (s *server) handleBatch(dataCollection *[]any, writeEntity
*clusterv1.SendRequest, start *time.Time) {
+func (s *server) handleBatch(dataCollection *[]any, writeEntity
*clusterv1.SendRequest, start *time.Time, identity *streamIdentity) {
if len(*dataCollection) == 0 {
- s.metrics.totalStarted.Inc(1, writeEntity.Topic)
+ if s.metrics != nil {
+ s.metrics.totalStarted.Inc(1, identity.operation,
identity.group, identity.senderNode, identity.senderRole, identity.senderTier)
+ }
*start = time.Now()
}
*dataCollection = append(*dataCollection, writeEntity.Body)
- s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
}
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index a3a56103f..02450ae37 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -438,81 +438,18 @@ func (s *server) SetNodeSchemaStatusRepo(svc
metadata.Service) {
type metrics struct {
totalStarted meter.Counter
totalFinished meter.Counter
+ totalLatency meter.Histogram
totalErr meter.Counter
- totalLatency meter.Counter
-
- totalMsgReceived meter.Counter
- totalMsgReceivedErr meter.Counter
- totalMsgSent meter.Counter
- totalMsgSentErr meter.Counter
-
- // Chunk ordering metrics
- outOfOrderChunksReceived meter.Counter
- chunksBuffered meter.Counter
- bufferTimeouts meter.Counter
- largeGapsRejected meter.Counter
- bufferCapacityExceeded meter.Counter
- finishSyncErr meter.Counter
-
- // Chunked sync saturation metrics
- activeSyncSessions meter.Gauge
- reorderBuffered meter.Gauge
-
- // Chunked sync outcome metrics
- chunkedSyncAbortedTotal meter.Counter
- chunkedSyncFailedParts meter.Counter
- chunkedSyncTotalBytes meter.Counter
- chunkedSyncDurationSecs meter.Histogram
+ receivedBytes meter.Counter
}
func newMetrics(factory observability.Factory) *metrics {
+ labels := []string{"operation", "group", "remote_node", "remote_role",
"remote_tier"}
return &metrics{
- totalStarted: factory.NewCounter("total_started",
"topic"),
- totalFinished: factory.NewCounter("total_finished",
"topic"),
- totalErr: factory.NewCounter("total_err", "topic"),
- totalLatency: factory.NewCounter("total_latency",
"topic"),
- totalMsgReceived: factory.NewCounter("total_msg_received",
"topic"),
- totalMsgReceivedErr:
factory.NewCounter("total_msg_received_err", "topic"),
- totalMsgSent: factory.NewCounter("total_msg_sent",
"topic"),
- totalMsgSentErr: factory.NewCounter("total_msg_sent_err",
"topic"),
-
- // Chunk ordering metrics
- outOfOrderChunksReceived:
factory.NewCounter("out_of_order_chunks_received", "topic"),
- chunksBuffered: factory.NewCounter("chunks_buffered",
"topic"),
- bufferTimeouts: factory.NewCounter("buffer_timeouts",
"topic"),
- largeGapsRejected:
factory.NewCounter("large_gaps_rejected", "topic"),
- bufferCapacityExceeded:
factory.NewCounter("buffer_capacity_exceeded", "topic"),
- finishSyncErr: factory.NewCounter("finish_sync_err",
"topic"),
-
- // Chunked sync saturation metrics
- activeSyncSessions:
factory.NewGauge("chunked_sync_active_sessions", "topic"),
- reorderBuffered:
factory.NewGauge("chunk_reorder_buffered_chunks", "topic"),
-
- // Chunked sync outcome metrics
- chunkedSyncAbortedTotal:
factory.NewCounter("chunked_sync_aborted_total", "topic", "reason"),
- chunkedSyncFailedParts:
factory.NewCounter("chunked_sync_failed_parts_total", "topic"),
- chunkedSyncTotalBytes:
factory.NewCounter("chunked_sync_total_bytes_received", "topic"),
- chunkedSyncDurationSecs:
factory.NewHistogram("chunked_sync_duration_seconds", meter.DefBuckets,
"topic"),
- }
-}
-
-// updateChunkOrderMetrics updates chunk ordering metrics.
-func (s *server) updateChunkOrderMetrics(event, topic string) {
- if s.metrics == nil {
- return // Skip metrics if not initialized (e.g., during tests)
- }
- switch event {
- case "out_of_order_received":
- s.metrics.outOfOrderChunksReceived.Inc(1, topic)
- case "chunk_buffered":
- s.metrics.chunksBuffered.Inc(1, topic)
- case "buffer_timeout":
- s.metrics.bufferTimeouts.Inc(1, topic)
- case "gap_too_large":
- s.metrics.largeGapsRejected.Inc(1, topic)
- case "buffer_full":
- s.metrics.bufferCapacityExceeded.Inc(1, topic)
- case "finish_sync_err":
- s.metrics.finishSyncErr.Inc(1, topic)
+ totalStarted: factory.NewCounter("total_started", labels...),
+ totalFinished: factory.NewCounter("total_finished", labels...),
+ totalLatency: factory.NewHistogram("total_latency",
meter.DefBuckets, labels...),
+ totalErr: factory.NewCounter("total_err", append(labels,
"error_type")...),
+ receivedBytes: factory.NewCounter("received_bytes", labels...),
}
}
diff --git a/banyand/queue/sub/server_metrics_test.go
b/banyand/queue/sub/server_metrics_test.go
index 121d2f5be..91638af00 100644
--- a/banyand/queue/sub/server_metrics_test.go
+++ b/banyand/queue/sub/server_metrics_test.go
@@ -34,46 +34,20 @@ type noopCounter struct{}
func (*noopCounter) Inc(_ float64, _ ...string) {}
func (*noopCounter) Delete(_ ...string) bool { return true }
-type noopGauge struct{}
-
-func (*noopGauge) Set(_ float64, _ ...string) {}
-func (*noopGauge) Add(_ float64, _ ...string) {}
-func (*noopGauge) Delete(_ ...string) bool { return true }
-
type noopHistogram struct{}
func (*noopHistogram) Observe(_ float64, _ ...string) {}
func (*noopHistogram) Delete(_ ...string) bool { return true }
-func newTestMetrics() *metrics {
+func newTestMetrics() *metrics { //nolint:exhaustruct
c := &noopCounter{}
- g := &noopGauge{}
h := &noopHistogram{}
return &metrics{
totalStarted: c,
totalFinished: c,
+ totalLatency: h,
totalErr: c,
- totalLatency: c,
-
- totalMsgReceived: c,
- totalMsgReceivedErr: c,
- totalMsgSent: c,
- totalMsgSentErr: c,
-
- outOfOrderChunksReceived: c,
- chunksBuffered: c,
- bufferTimeouts: c,
- largeGapsRejected: c,
- bufferCapacityExceeded: c,
- finishSyncErr: c,
-
- activeSyncSessions: g,
- reorderBuffered: g,
-
- chunkedSyncAbortedTotal: c,
- chunkedSyncFailedParts: c,
- chunkedSyncTotalBytes: c,
- chunkedSyncDurationSecs: h,
+ receivedBytes: c,
}
}
@@ -84,16 +58,6 @@ type fakeCounter struct {
func (f *fakeCounter) Inc(delta float64, _ ...string) { f.total += delta }
func (*fakeCounter) Delete(_ ...string) bool { return true }
-type fakeGauge struct {
- value float64
-}
-
-func (g *fakeGauge) Set(v float64, _ ...string) { g.value = v }
-func (g *fakeGauge) Add(delta float64, _ ...string) { g.value += delta }
-func (*fakeGauge) Delete(_ ...string) bool { return true }
-func newFakeGauge() meter.Gauge { return &fakeGauge{} }
-func getGaugeValue(g meter.Gauge) float64 { return
g.(*fakeGauge).value }
-
type fakeHistogram struct {
count int
}
@@ -101,65 +65,53 @@ type fakeHistogram struct {
func (h *fakeHistogram) Observe(_ float64, _ ...string) { h.count++ }
func (*fakeHistogram) Delete(_ ...string) bool { return true }
-func TestReleaseMetricsReleasesGauges(t *testing.T) {
- m := &metrics{
- activeSyncSessions: newFakeGauge(),
- reorderBuffered: newFakeGauge(),
- }
-
- sess := &syncSession{
- sessionID: "s1",
- startTime: time.Now(),
- metadata: &clusterv1.SyncMetadata{Topic:
"v1:stream-part-sync"},
- chunkBuffer: &chunkBuffer{
- chunks: map[uint32]*clusterv1.SyncPartRequest{
- 2: {ChunkIndex: 2},
- 3: {ChunkIndex: 3},
- },
- },
- }
-
- // simulate start increments
- m.activeSyncSessions.Add(1, sess.metadata.Topic)
- m.reorderBuffered.Add(2, sess.metadata.Topic)
-
- sess.releaseMetrics(m)
+// capturingCounterWithLabels records the exact label slices passed to Inc.
+type capturingCounterWithLabels struct {
+ calls [][]string
+}
- require.Equal(t, float64(0), getGaugeValue(m.activeSyncSessions))
- require.Equal(t, float64(0), getGaugeValue(m.reorderBuffered))
- // idempotent
- sess.releaseMetrics(m)
- require.Equal(t, float64(0), getGaugeValue(m.activeSyncSessions))
- require.Equal(t, float64(0), getGaugeValue(m.reorderBuffered))
+func (c *capturingCounterWithLabels) Inc(_ float64, labels ...string) {
+ cp := make([]string, len(labels))
+ copy(cp, labels)
+ c.calls = append(c.calls, cp)
}
-func TestCompletionOutcomeMetricsRecorded(t *testing.T) {
- logInitErr := logger.Init(logger.Logging{
- Env: "dev",
- Level: "info",
- })
+func (*capturingCounterWithLabels) Delete(_ ...string) bool { return true }
+
+// TestFileSyncCompletionMetrics verifies that handleCompletion records
+// totalFinished, totalLatency, receivedBytes with the canonical label order
+// (operation, group, remote_node, remote_role, remote_tier).
+func TestFileSyncCompletionMetrics(t *testing.T) {
+ logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
require.NoError(t, logInitErr)
- topic := "v1:stream-part-sync"
- totalBytes := float64(10)
+ finished := &fakeCounter{}
+ latency := &fakeHistogram{}
+ receivedB := &fakeCounter{}
+ totalErrC := &fakeCounter{}
- s := &server{
+ s := &server{ //nolint:exhaustruct
log: logger.GetLogger("test-server-completion-metrics"),
metrics: &metrics{
- activeSyncSessions: newFakeGauge(),
- reorderBuffered: newFakeGauge(),
- chunkedSyncAbortedTotal: &fakeCounter{},
- chunkedSyncFailedParts: &fakeCounter{},
- chunkedSyncTotalBytes: &fakeCounter{},
- chunkedSyncDurationSecs: &fakeHistogram{},
+ totalStarted: &fakeCounter{},
+ totalFinished: finished,
+ totalLatency: latency,
+ totalErr: totalErrC,
+ receivedBytes: receivedB,
},
}
- session := &syncSession{
+ session := &syncSession{ //nolint:exhaustruct
sessionID: "s1",
startTime: time.Now().Add(-2 * time.Second),
- metadata: &clusterv1.SyncMetadata{Topic: topic},
- partCtx: &queue.ChunkedSyncPartContext{},
+ metadata: &clusterv1.SyncMetadata{
+ Group: "my-group",
+ Topic: "v1:stream-part-sync",
+ SenderNode: "data-0:17912",
+ SenderRole: "data",
+ SenderTier: "hot",
+ },
+ partCtx: &queue.ChunkedSyncPartContext{},
partsProgress: map[int]*partProgress{
0: {totalBytes: 10, receivedBytes: 10, completed: true},
1: {totalBytes: 10, receivedBytes: 0, completed: false},
@@ -168,9 +120,6 @@ func TestCompletionOutcomeMetricsRecorded(t *testing.T) {
chunksReceived: 2,
}
- // Simulate start increments that would have happened on session
creation.
- s.metrics.activeSyncSessions.Add(1, topic)
-
mockStream := &MockSyncPartStream{}
req := &clusterv1.SyncPartRequest{
SessionId: session.sessionID,
@@ -182,40 +131,122 @@ func TestCompletionOutcomeMetricsRecorded(t *testing.T) {
handleErr := s.handleCompletion(mockStream, session, req)
require.NoError(t, handleErr)
- require.Equal(t, totalBytes,
s.metrics.chunkedSyncTotalBytes.(*fakeCounter).total)
- require.Equal(t, 1,
s.metrics.chunkedSyncDurationSecs.(*fakeHistogram).count)
- require.Equal(t, float64(1),
s.metrics.chunkedSyncFailedParts.(*fakeCounter).total)
- require.Equal(t, float64(0),
getGaugeValue(s.metrics.activeSyncSessions))
+ require.Equal(t, float64(1), finished.total, "totalFinished should be
incremented once")
+ require.Equal(t, 1, latency.count, "totalLatency should be observed
once")
+ require.Equal(t, float64(10), receivedB.total, "receivedBytes should
equal totalReceived")
+ // one failed part
+ require.Equal(t, float64(1), totalErrC.total, "one part_failed error
should be recorded")
require.True(t, session.completed)
}
-func TestCleanupPreviousSessionRecordsAborted(t *testing.T) {
- logInitErr := logger.Init(logger.Logging{
- Env: "dev",
- Level: "info",
- })
+// TestFileSyncStartedMetrics verifies that startOrSwitchSession records
totalStarted.
+func TestFileSyncStartedMetrics(t *testing.T) {
+ logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+ require.NoError(t, logInitErr)
+
+ started := &fakeCounter{}
+ s := &server{ //nolint:exhaustruct
+ log: logger.GetLogger("test-server-started-metrics"),
+ metrics: &metrics{
+ totalStarted: started,
+ totalFinished: &fakeCounter{},
+ totalLatency: &fakeHistogram{},
+ totalErr: &fakeCounter{},
+ receivedBytes: &fakeCounter{},
+ },
+ }
+
+ req := &clusterv1.SyncPartRequest{ //nolint:exhaustruct
+ SessionId: "s1",
+ Content: &clusterv1.SyncPartRequest_Metadata{
+ Metadata: &clusterv1.SyncMetadata{
+ Group: "g1",
+ Topic: "v1:stream-part-sync",
+ SenderNode: "liaison-0:17912",
+ SenderRole: "liaison",
+ SenderTier: "",
+ },
+ },
+ }
+ _ = s.startOrSwitchSession("s1", req, nil)
+ require.Equal(t, float64(1), started.total)
+}
+
+// TestFileSyncSwitchRecordsErrType verifies that switching sessions records
error_type=switch.
+func TestFileSyncSwitchRecordsErrType(t *testing.T) {
+ logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
require.NoError(t, logInitErr)
- topic := "v1:stream-part-sync"
- s := &server{
- log: logger.GetLogger("test-server-switch-abort"),
+ errLabels := &capturingCounterWithLabels{}
+ s := &server{ //nolint:exhaustruct
+ log: logger.GetLogger("test-server-switch"),
metrics: &metrics{
- activeSyncSessions: newFakeGauge(),
- reorderBuffered: newFakeGauge(),
- chunkedSyncAbortedTotal: &fakeCounter{},
+ totalStarted: &fakeCounter{},
+ totalFinished: &fakeCounter{},
+ totalLatency: &fakeHistogram{},
+ totalErr: errLabels,
+ receivedBytes: &fakeCounter{},
},
}
- prev := &syncSession{
+ prev := &syncSession{ //nolint:exhaustruct
sessionID: "s1",
startTime: time.Now().Add(-time.Second),
- metadata: &clusterv1.SyncMetadata{Topic: topic},
+ metadata: &clusterv1.SyncMetadata{
+ Group: "g1",
+ Topic: "v1:stream-part-sync",
+ SenderNode: "data-0:17912",
+ SenderRole: "data",
+ SenderTier: "warm",
+ },
}
- s.metrics.activeSyncSessions.Add(1, topic)
s.cleanupPreviousSession(prev)
- require.True(t, prev.abortedRecorded)
- require.Equal(t, float64(1),
s.metrics.chunkedSyncAbortedTotal.(*fakeCounter).total)
- require.Equal(t, float64(0),
getGaugeValue(s.metrics.activeSyncSessions))
+ require.True(t, prev.completed)
+ require.Len(t, errLabels.calls, 1)
+ lastLabel := errLabels.calls[0]
+ // label order: operation, group, remote_node, remote_role,
remote_tier, error_type
+ require.Equal(t,
meter.Counter(errLabels).(*capturingCounterWithLabels), errLabels)
+ require.Equal(t, "switch", lastLabel[len(lastLabel)-1], "error_type
must be 'switch'")
+}
+
+// TestFileSyncOperationLabel verifies the labels for file-sync sessions use
operation=file-sync.
+func TestFileSyncOperationLabel(t *testing.T) {
+ logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+ require.NoError(t, logInitErr)
+
+ started := &capturingCounterWithLabels{}
+ s := &server{ //nolint:exhaustruct
+ log: logger.GetLogger("test-server-op-label"),
+ metrics: &metrics{
+ totalStarted: started,
+ totalFinished: &fakeCounter{},
+ totalLatency: &fakeHistogram{},
+ totalErr: &fakeCounter{},
+ receivedBytes: &fakeCounter{},
+ },
+ }
+
+ req := &clusterv1.SyncPartRequest{ //nolint:exhaustruct
+ SessionId: "s1",
+ Content: &clusterv1.SyncPartRequest_Metadata{
+ Metadata: &clusterv1.SyncMetadata{
+ Group: "grp",
+ Topic: "v1:measure-part-sync",
+ SenderNode: "data-1:17912",
+ SenderRole: "data",
+ SenderTier: "cold",
+ },
+ },
+ }
+ _ = s.startOrSwitchSession("s1", req, nil)
+ require.Len(t, started.calls, 1)
+ labels := started.calls[0]
+ // operation, group, remote_node, remote_role, remote_tier
+ require.Equal(t, "file-sync", labels[0])
+ require.Equal(t, "grp", labels[1])
+ require.Equal(t, "data-1:17912", labels[2])
+ require.Equal(t, "data", labels[3])
+ require.Equal(t, "cold", labels[4])
}
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index d94664312..37307e327 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -52,15 +52,10 @@ func checkVersionCompatibility(versionInfo
*clusterv1.VersionInfo) (*clusterv1.V
serverFileFormatVersion := storage.GetCurrentVersion()
compatibleFileFormatVersions := storage.GetCompatibleVersions()
- // Check API version compatibility
apiCompatible := versionInfo.ApiVersion == serverAPIVersion
- // Check file format version compatibility
- fileFormatCompatible := false
- if versionInfo.FileFormatVersion == serverFileFormatVersion {
- fileFormatCompatible = true
- } else {
- // Check if client's file format version is in our compatible
list
+ fileFormatCompatible := versionInfo.FileFormatVersion ==
serverFileFormatVersion
+ if !fileFormatCompatible {
for _, compatVer := range compatibleFileFormatVersions {
if compatVer == versionInfo.FileFormatVersion {
fileFormatCompatible = true
@@ -69,7 +64,7 @@ func checkVersionCompatibility(versionInfo
*clusterv1.VersionInfo) (*clusterv1.V
}
}
- versionCompatibility := &clusterv1.VersionCompatibility{
+ vc := &clusterv1.VersionCompatibility{
ServerApiVersion: serverAPIVersion,
SupportedApiVersions: []string{serverAPIVersion},
ServerFileFormatVersion: serverFileFormatVersion,
@@ -78,37 +73,49 @@ func checkVersionCompatibility(versionInfo
*clusterv1.VersionInfo) (*clusterv1.V
switch {
case !apiCompatible && !fileFormatCompatible:
- versionCompatibility.Supported = false
- versionCompatibility.Reason = fmt.Sprintf("API version %s not
supported (server: %s) and file format version %s not compatible (server: %s,
supported: %v)",
+ vc.Supported = false
+ vc.Reason = fmt.Sprintf("API version %s not supported (server:
%s) and file format version %s not compatible (server: %s, supported: %v)",
versionInfo.ApiVersion, serverAPIVersion,
versionInfo.FileFormatVersion, serverFileFormatVersion,
compatibleFileFormatVersions)
- return versionCompatibility,
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+ return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
case !apiCompatible:
- versionCompatibility.Supported = false
- versionCompatibility.Reason = fmt.Sprintf("API version %s not
supported (server: %s)", versionInfo.ApiVersion, serverAPIVersion)
- return versionCompatibility,
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+ vc.Supported = false
+ vc.Reason = fmt.Sprintf("API version %s not supported (server:
%s)", versionInfo.ApiVersion, serverAPIVersion)
+ return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
case !fileFormatCompatible:
- versionCompatibility.Supported = false
- versionCompatibility.Reason = fmt.Sprintf("File format version
%s not compatible (server: %s, supported: %v)",
+ vc.Supported = false
+ vc.Reason = fmt.Sprintf("File format version %s not compatible
(server: %s, supported: %v)",
versionInfo.FileFormatVersion, serverFileFormatVersion,
compatibleFileFormatVersions)
- return versionCompatibility,
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+ return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
}
- versionCompatibility.Supported = true
- versionCompatibility.Reason = "Client version compatible with server"
- return versionCompatibility, modelv1.Status_STATUS_SUCCEED
+ vc.Supported = true
+ vc.Reason = "Client version compatible with server"
+ return vc, modelv1.Status_STATUS_SUCCEED
+}
+
+// streamIdentity captures the per-stream remote sender identity, pinned on
the first message.
+type streamIdentity struct {
+ senderNode string
+ senderRole string
+ senderTier string
+ group string
+ operation string
+ pinned bool
}
func (s *server) Send(stream clusterv1.Service_SendServer) error {
ctx := stream.Context()
var topic *bus.Topic
- var m bus.Message
var dataCollection []any
start := time.Now()
+ identity := &streamIdentity{}
+
defer func() {
- if topic != nil {
- s.metrics.totalFinished.Inc(1, topic.String())
- s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
topic.String())
+ if topic == nil || !identity.pinned || s.metrics == nil {
+ return
}
+ s.metrics.totalFinished.Inc(1, identity.operation,
identity.group, identity.senderNode, identity.senderRole, identity.senderTier)
+ s.metrics.totalLatency.Observe(time.Since(start).Seconds(),
identity.operation, identity.group, identity.senderNode, identity.senderRole,
identity.senderTier)
}()
for {
select {
@@ -118,132 +125,198 @@ func (s *server) Send(stream
clusterv1.Service_SendServer) error {
}
writeEntity, err := stream.Recv()
if errors.Is(err, io.EOF) {
- s.handleEOF(stream, topic, dataCollection, writeEntity)
+ s.handleEOF(stream, topic, dataCollection, writeEntity,
identity)
return nil
}
if err != nil {
return s.handleRecvError(err)
}
- // Check version compatibility on first message received
- if writeEntity.VersionInfo != nil {
- versionCompatibility, status :=
checkVersionCompatibility(writeEntity.VersionInfo)
- if status != modelv1.Status_STATUS_SUCCEED {
- s.log.Warn().
- Str("client_api_version",
writeEntity.VersionInfo.ApiVersion).
- Str("client_file_format_version",
writeEntity.VersionInfo.FileFormatVersion).
- Str("reason",
versionCompatibility.Reason).
- Msg("version compatibility check
failed")
-
- if errSend :=
stream.Send(&clusterv1.SendResponse{
- MessageId:
writeEntity.MessageId,
- Status: status,
- Error:
versionCompatibility.Reason,
- VersionCompatibility:
versionCompatibility,
- }); errSend != nil {
- s.log.Error().Err(errSend).Msg("failed
to send version incompatibility response")
- }
- return fmt.Errorf("version incompatibility:
%s", versionCompatibility.Reason)
- }
+ if versionErr := s.checkVersionAndReply(stream, writeEntity);
versionErr != nil {
+ return versionErr
}
- s.metrics.totalMsgReceived.Inc(1, writeEntity.Topic)
- if writeEntity.Topic != "" && topic == nil {
- t, ok := data.TopicMap[writeEntity.Topic]
- if !ok {
- s.reply(stream, writeEntity, err, "invalid
topic")
- continue
- }
- topic = &t
+ topic, err = s.resolveTopic(stream, writeEntity, topic,
identity)
+ if err != nil {
+ continue
}
if topic == nil {
- s.reply(stream, writeEntity, err, "topic is empty")
continue
}
- if reqSupplier, ok := data.TopicRequestMap[*topic]; ok {
- req := reqSupplier()
- if req == nil {
- m =
bus.NewMessage(bus.MessageID(writeEntity.MessageId), writeEntity.Body)
- } else {
- if errUnmarshal :=
proto.Unmarshal(writeEntity.Body, req); errUnmarshal != nil {
- s.reply(stream, writeEntity,
errUnmarshal, "failed to unmarshal message")
- continue
- }
- m =
bus.NewMessage(bus.MessageID(writeEntity.MessageId), req)
- }
- } else {
- s.reply(stream, writeEntity, err, "unknown topic")
+ s.pinIdentity(identity, writeEntity, *topic)
+
+ m, parseErr := s.parseMessage(stream, writeEntity, *topic,
identity)
+ if parseErr != nil {
continue
}
+
if writeEntity.BatchMod {
- s.handleBatch(&dataCollection, writeEntity, &start)
+ s.handleBatch(&dataCollection, writeEntity, &start,
identity)
continue
}
- s.metrics.totalStarted.Inc(1, writeEntity.Topic)
- listeners := s.getListeners(*topic)
- if len(listeners) == 0 {
- s.reply(stream, writeEntity, err, "no listener found")
- continue
- }
- if len(listeners) > 1 {
- logger.Panicf("multiple listeners found for topic %s",
*topic)
+
+ s.dispatchMessage(stream, writeEntity, *topic, m, identity,
&start)
+ }
+}
+
+// checkVersionAndReply returns an error if the version is incompatible and
sends the rejection response.
+func (s *server) checkVersionAndReply(stream clusterv1.Service_SendServer,
writeEntity *clusterv1.SendRequest) error {
+ if writeEntity.VersionInfo == nil {
+ return nil
+ }
+ vc, versionStatus := checkVersionCompatibility(writeEntity.VersionInfo)
+ if versionStatus == modelv1.Status_STATUS_SUCCEED {
+ return nil
+ }
+ s.log.Warn().
+ Str("client_api_version", writeEntity.VersionInfo.ApiVersion).
+ Str("client_file_format_version",
writeEntity.VersionInfo.FileFormatVersion).
+ Str("reason", vc.Reason).
+ Msg("version compatibility check failed")
+ if errSend := stream.Send(&clusterv1.SendResponse{
+ MessageId: writeEntity.MessageId,
+ Status: versionStatus,
+ Error: vc.Reason,
+ VersionCompatibility: vc,
+ }); errSend != nil {
+ s.log.Error().Err(errSend).Msg("failed to send version
incompatibility response")
+ }
+ return fmt.Errorf("version incompatibility: %s", vc.Reason)
+}
+
+// resolveTopic parses and validates the topic from the write entity.
+// Returns a non-nil error (sentinel) to signal the caller to `continue`.
+func (s *server) resolveTopic(
+ stream clusterv1.Service_SendServer,
+ writeEntity *clusterv1.SendRequest,
+ current *bus.Topic,
+ identity *streamIdentity,
+) (*bus.Topic, error) {
+ if writeEntity.Topic == "" || current != nil {
+ if current == nil {
+ s.replyWithErrType(stream, writeEntity, nil, "topic is
empty", identity, "empty_topic")
+ return nil, fmt.Errorf("empty")
}
- listener := listeners[0]
+ return current, nil
+ }
+ t, ok := data.TopicMap[writeEntity.Topic]
+ if !ok {
+ s.replyWithErrType(stream, writeEntity, nil, "invalid topic",
identity, "invalid_topic")
+ return nil, fmt.Errorf("invalid")
+ }
+ return &t, nil
+}
- m = listener.Rev(ctx, m)
- if m.Data() == nil {
- if errSend := stream.Send(&clusterv1.SendResponse{
- MessageId: writeEntity.MessageId,
- }); errSend != nil {
- s.log.Error().Stringer("request",
writeEntity).Err(errSend).Msg("failed to send empty response")
- s.metrics.totalMsgSentErr.Inc(1,
writeEntity.Topic)
- continue
+// pinIdentity pins the sender identity on the first message.
+func (s *server) pinIdentity(identity *streamIdentity, writeEntity
*clusterv1.SendRequest, topic bus.Topic) {
+ if !identity.pinned {
+ identity.senderNode = writeEntity.GetSenderNode()
+ identity.senderRole = writeEntity.GetSenderRole()
+ identity.senderTier = writeEntity.GetSenderTier()
+ identity.operation = data.OperationOf(topic)
+ identity.group = writeEntity.GetGroup()
+ identity.pinned = true
+ return
+ }
+ if g := writeEntity.GetGroup(); g != "" {
+ identity.group = g
+ }
+}
+
+// parseMessage deserializes the wire body into a bus.Message. Returns a
sentinel error to `continue` on failure.
+func (s *server) parseMessage(
+ stream clusterv1.Service_SendServer,
+ writeEntity *clusterv1.SendRequest,
+ topic bus.Topic,
+ identity *streamIdentity,
+) (bus.Message, error) {
+ reqSupplier, ok := data.TopicRequestMap[topic]
+ if !ok {
+ s.replyWithErrType(stream, writeEntity, nil, "unknown topic",
identity, "unknown_topic")
+ return bus.Message{}, fmt.Errorf("unknown")
+ }
+ req := reqSupplier()
+ if req == nil {
+ return bus.NewMessage(bus.MessageID(writeEntity.MessageId),
writeEntity.Body), nil
+ }
+ if errUnmarshal := proto.Unmarshal(writeEntity.Body, req); errUnmarshal
!= nil {
+ s.replyWithErrType(stream, writeEntity, errUnmarshal, "failed
to unmarshal message", identity, "unmarshal_error")
+ return bus.Message{}, errUnmarshal
+ }
+ return bus.NewMessage(bus.MessageID(writeEntity.MessageId), req), nil
+}
+
+// dispatchMessage invokes the listener and sends back the response.
+func (s *server) dispatchMessage(
+ stream clusterv1.Service_SendServer,
+ writeEntity *clusterv1.SendRequest,
+ topic bus.Topic,
+ m bus.Message,
+ identity *streamIdentity,
+ start *time.Time,
+) {
+ if s.metrics != nil {
+ s.metrics.totalStarted.Inc(1, identity.operation,
identity.group, identity.senderNode, identity.senderRole, identity.senderTier)
+ }
+ listeners := s.getListeners(topic)
+ if len(listeners) == 0 {
+ s.replyWithErrType(stream, writeEntity, nil, "no listener
found", identity, "no_listener")
+ return
+ }
+ if len(listeners) > 1 {
+ logger.Panicf("multiple listeners found for topic %s", topic)
+ }
+ m = listeners[0].Rev(stream.Context(), m)
+ if m.Data() == nil {
+ if errSend := stream.Send(&clusterv1.SendResponse{MessageId:
writeEntity.MessageId}); errSend != nil {
+ s.log.Error().Stringer("request",
writeEntity).Err(errSend).Msg("failed to send empty response")
+ if s.metrics != nil {
+ s.metrics.totalErr.Inc(1, identity.operation,
identity.group, identity.senderNode, identity.senderRole, identity.senderTier,
"send_error")
}
- s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
- continue
}
- var responseBody []byte
- switch d := m.Data().(type) {
- case proto.Message:
- var marshalErr error
- responseBody, marshalErr = proto.Marshal(d)
- if marshalErr != nil {
- s.reply(stream, writeEntity, marshalErr,
"failed to marshal message")
- continue
- }
- case []byte:
- // Topic-AND-process-wire-mode guard: an opaque []byte
response body
- // is the raw vec columnar frame and is only valid on
- // TopicInternalMeasureQuery when this process is
flag-on. The
- // default: arm still rejects []byte for every other
topic/mode.
- if *topic != data.TopicInternalMeasureQuery ||
!data.MeasureWireModeRaw() {
- s.reply(stream, writeEntity, nil,
fmt.Sprintf("invalid response: unexpected raw body on topic %s", *topic))
- continue
- }
- responseBody = d
- case *common.Error:
- select {
- case <-ctx.Done():
- s.metrics.totalMsgReceivedErr.Inc(1,
writeEntity.Topic)
- return ctx.Err()
- default:
- }
- s.reply(stream, writeEntity, nil, d.Error())
- continue
- default:
- s.reply(stream, writeEntity, nil, fmt.Sprintf("invalid
response: %T", d))
- continue
+ return
+ }
+ responseBody, marshalErr := s.marshalResponse(stream, writeEntity,
topic, m, identity)
+ if marshalErr != nil {
+ return
+ }
+ if sendErr := stream.Send(&clusterv1.SendResponse{MessageId:
writeEntity.MessageId, Body: responseBody}); sendErr != nil {
+ s.log.Error().Stringer("request", writeEntity).Dur("latency",
time.Since(*start)).Err(sendErr).Msg("failed to send query response")
+ if s.metrics != nil {
+ s.metrics.totalErr.Inc(1, identity.operation,
identity.group, identity.senderNode, identity.senderRole, identity.senderTier,
"send_error")
}
- if err := stream.Send(&clusterv1.SendResponse{
- MessageId: writeEntity.MessageId,
- Body: responseBody,
- }); err != nil {
- s.log.Error().Stringer("request",
writeEntity).Dur("latency", time.Since(start)).Err(err).Msg("failed to send
query response")
- s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
- continue
+ }
+}
+
+// marshalResponse converts a bus.Message data to a wire body. Returns
sentinel error to stop dispatch on failure.
+func (s *server) marshalResponse(
+ stream clusterv1.Service_SendServer,
+ writeEntity *clusterv1.SendRequest,
+ topic bus.Topic,
+ m bus.Message,
+ identity *streamIdentity,
+) ([]byte, error) {
+ switch d := m.Data().(type) {
+ case proto.Message:
+ body, marshalErr := proto.Marshal(d)
+ if marshalErr != nil {
+ s.replyWithErrType(stream, writeEntity, marshalErr,
"failed to marshal message", identity, "marshal_error")
+ return nil, marshalErr
+ }
+ return body, nil
+ case []byte:
+ if topic != data.TopicInternalMeasureQuery ||
!data.MeasureWireModeRaw() {
+ s.replyWithErrType(stream, writeEntity, nil,
fmt.Sprintf("invalid response: unexpected raw body on topic %s", topic),
identity, "marshal_error")
+ return nil, fmt.Errorf("invalid raw body")
}
- s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
+ return d, nil
+ case *common.Error:
+ s.replyWithErrType(stream, writeEntity, nil, d.Error(),
identity, "handler_error")
+ return nil, fmt.Errorf("handler error")
+ default:
+ s.replyWithErrType(stream, writeEntity, nil,
fmt.Sprintf("invalid response: %T", d), identity, "handler_error")
+ return nil, fmt.Errorf("invalid response type")
}
}
@@ -269,25 +342,26 @@ func (s *server) getListeners(topic bus.Topic)
[]bus.MessageListener {
return s.listeners[topic]
}
-func (s *server) reply(stream clusterv1.Service_SendServer, writeEntity
*clusterv1.SendRequest, err error, message string) {
+func (s *server) replyWithErrType(
+ stream clusterv1.Service_SendServer,
+ writeEntity *clusterv1.SendRequest,
+ err error,
+ message string,
+ identity *streamIdentity,
+ errType string,
+) {
s.log.Error().Stringer("request", writeEntity).Err(err).Msg(message)
- s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic)
- resp := &clusterv1.SendResponse{
- MessageId: writeEntity.MessageId,
+ if s.metrics != nil {
+ s.metrics.totalErr.Inc(1, identity.operation, identity.group,
identity.senderNode, identity.senderRole, identity.senderTier, errType)
}
-
- var ce *common.Error
- if errors.As(err, &ce) {
- resp.Error = ce.Error()
- resp.Status = ce.Status()
- } else {
- resp.Error = message
+ var msgID uint64
+ if writeEntity != nil {
+ msgID = writeEntity.MessageId
}
if errResp := stream.Send(&clusterv1.SendResponse{
- MessageId: writeEntity.MessageId,
+ MessageId: msgID,
Error: message,
}); errResp != nil {
s.log.Error().Err(errResp).AnErr("original",
err).Stringer("request", writeEntity).Msg("failed to send error response")
- s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
}
}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 0090d8438..ff5fd5760 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2487,6 +2487,10 @@ PartResult contains the result for individual parts.
| body | [bytes](#bytes) | | |
| batch_mod | [bool](#bool) | | |
| version_info | [VersionInfo](#banyandb-cluster-v1-VersionInfo) | |
version_info contains version information |
+| group | [string](#string) | | group is the business group associated with
this message. |
+| sender_node | [string](#string) | | sender_node is the BanyanDB node name
of the sender; set only on the first message of a stream. |
+| sender_role | [string](#string) | | sender_role is the role of the sender
node; set only on the first message of a stream. |
+| sender_tier | [string](#string) | | sender_tier is the storage tier label
of the sender node; set only on the first message of a stream. |
@@ -2542,6 +2546,9 @@ SyncMetadata contains metadata for the sync operation.
| topic | [string](#string) | | Sync topic (stream-part-sync or
measure-part-sync). |
| timestamp | [int64](#int64) | | Timestamp when sync started. |
| total_parts | [uint32](#uint32) | | Total number of parts being synced. |
+| sender_node | [string](#string) | | sender_node is the BanyanDB node name
of the sender. |
+| sender_role | [string](#string) | | sender_role is the role of the sender
node. |
+| sender_tier | [string](#string) | | sender_tier is the storage tier label
of the sender node. |
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 154360277..a261e74b7 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -162,6 +162,15 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
if err != nil {
return err
}
+ // Stamp the liaison's own identity onto the cluster
publishers so the
+ // receiving data nodes can label queue metrics by
sender (remote_node/
+ // role/tier), enabling the liaison<->data topology to
be drawn.
+ liaisonTier := node.Labels["type"]
+ for _, c := range []queue.Client{tire1Client,
tire2Client} {
+ if setter, ok := c.(interface{
SetSelfNode(string, string, string) }); ok {
+ setter.SetSelfNode(node.NodeID,
"liaison", liaisonTier)
+ }
+ }
logger.GetLogger().Info().Msg("starting as a liaison
server")
runCtx = context.WithValue(runCtx,
common.ContextNodeKey, node)
// Spawn our go routines and wait for shutdown.