This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new ab7ac5b60 fix(queue): label batch-write metrics by group for 
index-sync writes (#1162)
ab7ac5b60 is described below

commit ab7ac5b603d1f7e89e53fc52da6158b8a26ba12c
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Jun 6 20:55:53 2026 +0800

    fix(queue): label batch-write metrics by group for index-sync writes (#1162)
---
 CHANGES.md                                   |  2 +-
 banyand/measure/write_liaison.go             |  2 +-
 banyand/queue/pub/message_to_request_test.go | 63 ++++++++++++++++++++++++++++
 banyand/queue/pub/pub.go                     |  3 +-
 banyand/stream/write_liaison.go              |  4 +-
 banyand/trace/write_liaison.go               |  2 +-
 pkg/bus/bus.go                               | 15 +++++++
 7 files changed, 85 insertions(+), 6 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 2f4c650bf..5c3d9f315 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,7 +5,7 @@ Release Notes.
 ## 0.11.0
 
 ### Features
-- Redesign the queue (`queue_pub`/`queue_sub`) metrics around a uniform model: 
keep only `total_started`, `total_finished`, `total_latency` (now a histogram) 
and `total_err`, plus file-sync-only `sent_bytes` (pub) / `received_bytes` 
(sub). Replace the `topic` label with `operation` 
(`batch-write`/`file-sync`/`query`/`control`) and `group`, add an `error_type` 
label on `total_err`, and add remote-endpoint labels 
(`remote_node`/`remote_role`/`remote_tier`) so the liaison↔data (hot/warm/col 
[...]
+- Redesign the queue (`queue_pub`/`queue_sub`) metrics around a uniform model: 
keep only `total_started`, `total_finished`, `total_latency` (now a histogram) 
and `total_err`, plus file-sync-only `sent_bytes` (pub) / `received_bytes` 
(sub). Replace the `topic` label with `operation` 
(`batch-write`/`file-sync`/`query`/`control`) and `group`, add an `error_type` 
label on `total_err`, and add remote-endpoint labels 
(`remote_node`/`remote_role`/`remote_tier`) so the liaison↔data (hot/warm/col 
[...]
 - Vectorized measure query path is now enabled by default. The columnar 
pipeline replaces per-row protobuf serialization in `NewMIterator`, cutting 
allocations and ns/op for scan-heavy measure queries; gRPC wire format 
(`*measurev1.InternalDataPoint`) is byte-identical. Single-node coverage is 
complete: scan, GroupBy+Agg via `BatchAggregation`, scalar reduce (`Agg` 
without `GroupBy`), raw `GroupBy` (without `Agg`), implicit projection coverage 
for GroupBy/Agg fields, `TopN`/`BottomN`, `o [...]
 - Add validation to ensure Measure's ShardingKey contains all Entity tags to 
guarantee entity locality.
 - Organize access logs under a dedicated "accesslog" subdirectory to improve 
log organization and separation from other application data.
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 6dac293a0..4bcfcb625 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -144,7 +144,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
 
                                // Send to all nodes for this shard
                                for _, node := range nodes {
-                                       message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+                                       message := 
bus.NewMessageWithNodeAndGroup(bus.MessageID(time.Now().UnixNano()), node, 
g.name, combinedData)
                                        future, publishErr := 
w.tire2Client.Publish(ctx, topic, message)
                                        if publishErr != nil {
                                                
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to publish series index to node")
diff --git a/banyand/queue/pub/message_to_request_test.go 
b/banyand/queue/pub/message_to_request_test.go
new file mode 100644
index 000000000..2ab1586f9
--- /dev/null
+++ b/banyand/queue/pub/message_to_request_test.go
@@ -0,0 +1,63 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+// TestMessageToRequestGroup verifies that the group label is populated for 
both
+// proto-message payloads (resolved from the request metadata) and 
pre-marshaled
+// []byte payloads (carried explicitly on the bus message), so the queue 
metrics
+// stay labeled by group on the secondary-index sync path.
+func TestMessageToRequestGroup(t *testing.T) {
+       t.Run("proto message resolves group from metadata", func(t *testing.T) {
+               msg := bus.NewMessageWithNode(1, "node-1", 
&streamv1.InternalWriteRequest{
+                       Request: &streamv1.WriteRequest{
+                               Metadata: &commonv1.Metadata{Group: 
"stream-group"},
+                       },
+               })
+               r, err := messageToRequest(data.TopicStreamWrite, msg)
+               require.NoError(t, err)
+               require.Equal(t, "stream-group", r.GetGroup())
+               require.NotNil(t, r.GetBody())
+       })
+
+       t.Run("byte payload carries explicit group", func(t *testing.T) {
+               body := []byte{0x1, 0x2, 0x3}
+               msg := bus.NewMessageWithNodeAndGroup(2, "node-1", "idx-group", 
body)
+               r, err := messageToRequest(data.TopicStreamSeriesIndexWrite, 
msg)
+               require.NoError(t, err)
+               require.Equal(t, "idx-group", r.GetGroup())
+               require.Equal(t, body, r.GetBody())
+       })
+
+       t.Run("byte payload without group leaves it empty", func(t *testing.T) {
+               msg := bus.NewMessageWithNode(3, "node-1", []byte{0x9})
+               r, err := messageToRequest(data.TopicStreamSeriesIndexWrite, 
msg)
+               require.NoError(t, err)
+               require.Empty(t, r.GetGroup())
+       })
+}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 28acdb747..83be1e3b8 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -330,7 +330,7 @@ func (p *pub) Broadcast(timeout time.Duration, topic 
bus.Topic, messages bus.Mes
                wg.Add(1)
                go func(n string) {
                        defer wg.Done()
-                       f, err := p.publish(timeout, topic, 
bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
+                       f, err := p.publish(timeout, topic, 
bus.NewMessageWithNodeAndGroup(messages.ID(), n, messages.Group(), 
messages.Data()))
                        futureCh <- publishResult{n: n, f: f, e: err}
                }(n)
        }
@@ -534,6 +534,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) 
(*clusterv1.SendRequest, e
                r.Group = apidata.GroupFromMessageData(topic, msgData)
        case []byte:
                r.Body = msgData
+               r.Group = m.Group()
        default:
                return nil, fmt.Errorf("invalid message type %T", m.Data())
        }
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index 76abc3ef5..5293d4bd9 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -223,7 +223,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
 
                                // Send to all nodes for this shard
                                for _, node := range nodes {
-                                       message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+                                       message := 
bus.NewMessageWithNodeAndGroup(bus.MessageID(time.Now().UnixNano()), node, 
g.name, combinedData)
                                        future, publishErr := 
w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
                                        if publishErr != nil {
                                                
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to publish series index to node")
@@ -251,7 +251,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
 
                                        // Send to all nodes for this shard
                                        for _, node := range nodes {
-                                               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+                                               message := 
bus.NewMessageWithNodeAndGroup(bus.MessageID(time.Now().UnixNano()), node, 
g.name, combinedData)
                                                future, publishErr := 
w.tire2Client.Publish(ctx, data.TopicStreamLocalIndexWrite, message)
                                                if publishErr != nil {
                                                        
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to publish local index to node")
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index 7b804e294..752ac086f 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -257,7 +257,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
 
                                // Send to all nodes for this shard
                                for _, node := range nodes {
-                                       message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+                                       message := 
bus.NewMessageWithNodeAndGroup(bus.MessageID(time.Now().UnixNano()), node, 
g.name, combinedData)
                                        future, publishErr := 
w.tire2Client.Publish(ctx, data.TopicTraceSidxSeriesWrite, message)
                                        if publishErr != nil {
                                                
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to publish series index to node")
diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go
index 745000d0f..569567bb2 100644
--- a/pkg/bus/bus.go
+++ b/pkg/bus/bus.go
@@ -51,6 +51,7 @@ type Message struct {
        nodeSelectors map[string][]string
        timeRange     *modelv1.TimeRange
        node          string
+       group         string
        id            MessageID
        batchMode     bool
 }
@@ -70,6 +71,13 @@ func (m Message) Node() string {
        return m.node
 }
 
+// Group returns the business group the Message belongs to.
+// It is set for pre-marshaled ([]byte) payloads whose group cannot be
+// recovered from the body, so the queue metrics can still be labeled by group.
+func (m Message) Group() string {
+       return m.group
+}
+
 // NodeSelectors returns the node selectors of the Message.
 func (m Message) NodeSelectors() map[string][]string {
        return m.nodeSelectors
@@ -100,6 +108,13 @@ func NewMessageWithNode(id MessageID, node string, data 
interface{}) Message {
        return Message{id: id, node: node, payload: data}
 }
 
+// NewMessageWithNodeAndGroup returns a new Message carrying an explicit 
business group.
+// Use it for pre-marshaled ([]byte) payloads (e.g. secondary-index sync), 
where the group
+// is embedded in the opaque body and therefore not recoverable by the 
publisher metrics path.
+func NewMessageWithNodeAndGroup(id MessageID, node, group string, data 
interface{}) Message {
+       return Message{id: id, node: node, group: group, payload: data}
+}
+
 // NewMessageWithNodeSelectors returns a new Message with a MessageID and 
NodeSelectors and embed data.
 // Nodes matching any of the selectors will receive the message.
 func NewMessageWithNodeSelectors(id MessageID, nodeSelectors 
map[string][]string, timeRange *modelv1.TimeRange, data interface{}) Message {

Reply via email to