This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ab7ac5b60 fix(queue): label batch-write metrics by group for
index-sync writes (#1162)
ab7ac5b60 is described below
commit ab7ac5b603d1f7e89e53fc52da6158b8a26ba12c
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Jun 6 20:55:53 2026 +0800
fix(queue): label batch-write metrics by group for index-sync writes (#1162)
---
CHANGES.md | 2 +-
banyand/measure/write_liaison.go | 2 +-
banyand/queue/pub/message_to_request_test.go | 63 ++++++++++++++++++++++++++++
banyand/queue/pub/pub.go | 3 +-
banyand/stream/write_liaison.go | 4 +-
banyand/trace/write_liaison.go | 2 +-
pkg/bus/bus.go | 15 +++++++
7 files changed, 85 insertions(+), 6 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 2f4c650bf..5c3d9f315 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,7 +5,7 @@ Release Notes.
## 0.11.0
### Features
-- Redesign the queue (`queue_pub`/`queue_sub`) metrics around a uniform model:
keep only `total_started`, `total_finished`, `total_latency` (now a histogram)
and `total_err`, plus file-sync-only `sent_bytes` (pub) / `received_bytes`
(sub). Replace the `topic` label with `operation`
(`batch-write`/`file-sync`/`query`/`control`) and `group`, add an `error_type`
label on `total_err`, and add remote-endpoint labels
(`remote_node`/`remote_role`/`remote_tier`) so the liaison↔data (hot/warm/col
[...]
+- Redesign the queue (`queue_pub`/`queue_sub`) metrics around a uniform model:
keep only `total_started`, `total_finished`, `total_latency` (now a histogram)
and `total_err`, plus file-sync-only `sent_bytes` (pub) / `received_bytes`
(sub). Replace the `topic` label with `operation`
(`batch-write`/`file-sync`/`query`/`control`) and `group`, add an `error_type`
label on `total_err`, and add remote-endpoint labels
(`remote_node`/`remote_role`/`remote_tier`) so the liaison↔data (hot/warm/col
[...]
- Vectorized measure query path is now enabled by default. The columnar
pipeline replaces per-row protobuf serialization in `NewMIterator`, cutting
allocations and ns/op for scan-heavy measure queries; gRPC wire format
(`*measurev1.InternalDataPoint`) is byte-identical. Single-node coverage is
complete: scan, GroupBy+Agg via `BatchAggregation`, scalar reduce (`Agg`
without `GroupBy`), raw `GroupBy` (without `Agg`), implicit projection coverage
for GroupBy/Agg fields, `TopN`/`BottomN`, `o [...]
- Add validation to ensure Measure's ShardingKey contains all Entity tags to
guarantee entity locality.
- Organize access logs under a dedicated "accesslog" subdirectory to improve
log organization and separation from other application data.
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 6dac293a0..4bcfcb625 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -144,7 +144,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
// Send to all nodes for this shard
for _, node := range nodes {
- message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+ message :=
bus.NewMessageWithNodeAndGroup(bus.MessageID(time.Now().UnixNano()), node,
g.name, combinedData)
future, publishErr :=
w.tire2Client.Publish(ctx, topic, message)
if publishErr != nil {
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to publish series index to node")
diff --git a/banyand/queue/pub/message_to_request_test.go
b/banyand/queue/pub/message_to_request_test.go
new file mode 100644
index 000000000..2ab1586f9
--- /dev/null
+++ b/banyand/queue/pub/message_to_request_test.go
@@ -0,0 +1,63 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+// TestMessageToRequestGroup verifies that the group label is populated for
both
+// proto-message payloads (resolved from the request metadata) and
pre-marshaled
+// []byte payloads (carried explicitly on the bus message), so the queue
metrics
+// stay labeled by group on the secondary-index sync path.
+func TestMessageToRequestGroup(t *testing.T) {
+ t.Run("proto message resolves group from metadata", func(t *testing.T) {
+ msg := bus.NewMessageWithNode(1, "node-1",
&streamv1.InternalWriteRequest{
+ Request: &streamv1.WriteRequest{
+ Metadata: &commonv1.Metadata{Group:
"stream-group"},
+ },
+ })
+ r, err := messageToRequest(data.TopicStreamWrite, msg)
+ require.NoError(t, err)
+ require.Equal(t, "stream-group", r.GetGroup())
+ require.NotNil(t, r.GetBody())
+ })
+
+ t.Run("byte payload carries explicit group", func(t *testing.T) {
+ body := []byte{0x1, 0x2, 0x3}
+ msg := bus.NewMessageWithNodeAndGroup(2, "node-1", "idx-group",
body)
+ r, err := messageToRequest(data.TopicStreamSeriesIndexWrite,
msg)
+ require.NoError(t, err)
+ require.Equal(t, "idx-group", r.GetGroup())
+ require.Equal(t, body, r.GetBody())
+ })
+
+ t.Run("byte payload without group leaves it empty", func(t *testing.T) {
+ msg := bus.NewMessageWithNode(3, "node-1", []byte{0x9})
+ r, err := messageToRequest(data.TopicStreamSeriesIndexWrite,
msg)
+ require.NoError(t, err)
+ require.Empty(t, r.GetGroup())
+ })
+}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 28acdb747..83be1e3b8 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -330,7 +330,7 @@ func (p *pub) Broadcast(timeout time.Duration, topic
bus.Topic, messages bus.Mes
wg.Add(1)
go func(n string) {
defer wg.Done()
- f, err := p.publish(timeout, topic,
bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
+ f, err := p.publish(timeout, topic,
bus.NewMessageWithNodeAndGroup(messages.ID(), n, messages.Group(),
messages.Data()))
futureCh <- publishResult{n: n, f: f, e: err}
}(n)
}
@@ -534,6 +534,7 @@ func messageToRequest(topic bus.Topic, m bus.Message)
(*clusterv1.SendRequest, e
r.Group = apidata.GroupFromMessageData(topic, msgData)
case []byte:
r.Body = msgData
+ r.Group = m.Group()
default:
return nil, fmt.Errorf("invalid message type %T", m.Data())
}
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index 76abc3ef5..5293d4bd9 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -223,7 +223,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
// Send to all nodes for this shard
for _, node := range nodes {
- message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+ message :=
bus.NewMessageWithNodeAndGroup(bus.MessageID(time.Now().UnixNano()), node,
g.name, combinedData)
future, publishErr :=
w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
if publishErr != nil {
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to publish series index to node")
@@ -251,7 +251,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
// Send to all nodes for this shard
for _, node := range nodes {
- message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+ message :=
bus.NewMessageWithNodeAndGroup(bus.MessageID(time.Now().UnixNano()), node,
g.name, combinedData)
future, publishErr :=
w.tire2Client.Publish(ctx, data.TopicStreamLocalIndexWrite, message)
if publishErr != nil {
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to publish local index to node")
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index 7b804e294..752ac086f 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -257,7 +257,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
// Send to all nodes for this shard
for _, node := range nodes {
- message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+ message :=
bus.NewMessageWithNodeAndGroup(bus.MessageID(time.Now().UnixNano()), node,
g.name, combinedData)
future, publishErr :=
w.tire2Client.Publish(ctx, data.TopicTraceSidxSeriesWrite, message)
if publishErr != nil {
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to publish series index to node")
diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go
index 745000d0f..569567bb2 100644
--- a/pkg/bus/bus.go
+++ b/pkg/bus/bus.go
@@ -51,6 +51,7 @@ type Message struct {
nodeSelectors map[string][]string
timeRange *modelv1.TimeRange
node string
+ group string
id MessageID
batchMode bool
}
@@ -70,6 +71,13 @@ func (m Message) Node() string {
return m.node
}
+// Group returns the business group the Message belongs to.
+// It is set for pre-marshaled ([]byte) payloads whose group cannot be
+// recovered from the body, so the queue metrics can still be labeled by group.
+func (m Message) Group() string {
+ return m.group
+}
+
// NodeSelectors returns the node selectors of the Message.
func (m Message) NodeSelectors() map[string][]string {
return m.nodeSelectors
@@ -100,6 +108,13 @@ func NewMessageWithNode(id MessageID, node string, data
interface{}) Message {
return Message{id: id, node: node, payload: data}
}
+// NewMessageWithNodeAndGroup returns a new Message carrying an explicit
business group.
+// Use it for pre-marshaled ([]byte) payloads (e.g. secondary-index sync),
where the group
+// is embedded in the opaque body and therefore not recoverable by the
publisher metrics path.
+func NewMessageWithNodeAndGroup(id MessageID, node, group string, data
interface{}) Message {
+ return Message{id: id, node: node, group: group, payload: data}
+}
+
// NewMessageWithNodeSelectors returns a new Message with a MessageID and
NodeSelectors and embed data.
// Nodes matching any of the selectors will receive the message.
func NewMessageWithNodeSelectors(id MessageID, nodeSelectors
map[string][]string, timeRange *modelv1.TimeRange, data interface{}) Message {