Copilot commented on code in PR #1169:
URL:
https://github.com/apache/skywalking-banyandb/pull/1169#discussion_r3393101159
##########
banyand/queue/pub/batch.go:
##########
@@ -243,9 +264,13 @@ func (bp *batchPublisher) listenBatchResponse(ctx
context.Context, s clusterv1.S
if errRecv != nil {
if bp.hasMetrics() {
bp.pub.metrics.totalErr.Inc(1, operation, "", curNode,
info.role, info.tier, sendErrReasonRecvError)
+ bp.pub.metrics.totalBatchFinished.Inc(1, operation, "",
curNode, info.role, info.tier)
Review Comment:
In the recv-error path, `total_batch_finished`/`total_batch_latency` are
recorded even though no final `SendResponse` was received. This conflicts with
the documented semantics in this PR (batch finished/latency observed when the
stream’s terminal response is received). Consider leaving started without a
matching finished on `Recv` errors (and relying on
`total_err{error_type="recv_error"}`), or update the docs/metric name to match
the current behavior.
##########
banyand/queue/sub/batch_metrics_test.go:
##########
@@ -0,0 +1,296 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sub
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc/metadata"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// mockSendServer implements clusterv1.Service_SendServer for sub batch tests.
+type mockSendServer struct {
+ ctx context.Context
+ sendErr error
+ sent []*clusterv1.SendResponse
+ mu sync.Mutex
+}
+
+func newMockSendServer() *mockSendServer {
+ return &mockSendServer{
+ ctx: metadata.NewIncomingContext(context.Background(),
metadata.MD{}),
+ }
+}
+
+func (m *mockSendServer) Send(resp *clusterv1.SendResponse) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.sent = append(m.sent, resp)
+ return m.sendErr
+}
+
+func (m *mockSendServer) Recv() (*clusterv1.SendRequest, error) { return nil,
nil }
+func (m *mockSendServer) Context() context.Context { return m.ctx
}
+func (m *mockSendServer) SetHeader(metadata.MD) error { return nil }
+func (m *mockSendServer) SendHeader(metadata.MD) error { return nil }
+func (m *mockSendServer) SetTrailer(metadata.MD) {}
+func (m *mockSendServer) SendMsg(_ any) error { return nil }
+func (m *mockSendServer) RecvMsg(_ any) error { return nil }
+
+// echoListener is a healthy MessageListener that echoes messages back.
+type echoListener struct {
+ bus.UnImplementedHealthyListener
+}
+
+func (*echoListener) Rev(_ context.Context, msg bus.Message) bus.Message {
return msg }
+
+func newBatchTestServer(m *metrics) *server { //nolint:exhaustruct
+ logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+ if logInitErr != nil {
+ panic(logInitErr)
+ }
+ return &server{
+ log: logger.GetLogger("batch-metrics-test"),
+ metrics: m,
+ listeners: make(map[bus.Topic][]bus.MessageListener),
+ listenersLock: sync.RWMutex{},
+ topicMap: make(map[string]bus.Topic),
+ }
+}
+
+func newBatchMetrics() (m *metrics, msgStarted, msgFinished, batchStarted,
batchFinished *fakeCounter, batchLatency *fakeHistogram) {
+ msgStarted = &fakeCounter{}
+ msgFinished = &fakeCounter{}
+ batchStarted = &fakeCounter{}
+ batchFinished = &fakeCounter{}
+ batchLatency = &fakeHistogram{}
+ m = &metrics{ //nolint:exhaustruct
+ totalStarted: &fakeCounter{},
+ totalFinished: &fakeCounter{},
+ totalLatency: &fakeHistogram{},
+ totalErr: &fakeCounter{},
+ receivedBytes: &fakeCounter{},
+ totalMessageStarted: msgStarted,
+ totalMessageFinished: msgFinished,
+ totalBatchStarted: batchStarted,
+ totalBatchFinished: batchFinished,
+ totalBatchLatency: batchLatency,
+ }
+ return m, msgStarted, msgFinished, batchStarted, batchFinished,
batchLatency
+}
+
+func makeBatchReq(group, senderNode string) *clusterv1.SendRequest {
//nolint:exhaustruct
+ return &clusterv1.SendRequest{
+ Topic: data.TopicMeasureWrite.String(),
+ Group: group,
+ SenderNode: senderNode,
+ SenderRole: "liaison",
+ SenderTier: "hot",
+ BatchMod: true,
+ Body: []byte("data"),
+ }
+}
+
+// TestBatchModNMessageMetrics asserts that for an N-message BatchMod batch:
+// total_message_started==N, total_message_finished==N,
total_batch_started==1, total_batch_finished==1,
+// and total_batch_latency is observed exactly once.
+func TestBatchModNMessageMetrics(t *testing.T) {
+ const N = 3
+ logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+ require.NoError(t, logInitErr)
+
+ m, msgStarted, msgFinished, batchStarted, batchFinished, batchLatency
:= newBatchMetrics()
+ s := newBatchTestServer(m)
+
+ topic := data.TopicMeasureWrite
+ require.NoError(t, s.Subscribe(topic, &echoListener{}))
+
+ stream := newMockSendServer()
+ req := makeBatchReq("g1", "liaison-0:17912")
+
+ identity := &streamIdentity{}
+ var dataCollection []any
+ start := time.Now()
+
+ // Simulate N batch messages (handleBatch called N times with the same
identity).
+ for range N {
+ s.handleBatch(&dataCollection, req, &start, identity)
+ }
+
+ // Pin identity as the real Send path does before handleBatch is called.
+ s.pinIdentity(identity, req, topic)
+
Review Comment:
This test calls `handleBatch` before `pinIdentity`, but in the real `Send`
path identity is pinned before metrics are ticked. Keeping the same ordering in
the test helps prevent future regressions (e.g., ticking counters with empty
label values).
##########
banyand/queue/sub/batch_metrics_test.go:
##########
@@ -0,0 +1,296 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sub
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc/metadata"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// mockSendServer implements clusterv1.Service_SendServer for sub batch tests.
+type mockSendServer struct {
+ ctx context.Context
+ sendErr error
+ sent []*clusterv1.SendResponse
+ mu sync.Mutex
+}
+
+func newMockSendServer() *mockSendServer {
+ return &mockSendServer{
+ ctx: metadata.NewIncomingContext(context.Background(),
metadata.MD{}),
+ }
+}
+
+func (m *mockSendServer) Send(resp *clusterv1.SendResponse) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.sent = append(m.sent, resp)
+ return m.sendErr
+}
+
+func (m *mockSendServer) Recv() (*clusterv1.SendRequest, error) { return nil,
nil }
+func (m *mockSendServer) Context() context.Context { return m.ctx
}
+func (m *mockSendServer) SetHeader(metadata.MD) error { return nil }
+func (m *mockSendServer) SendHeader(metadata.MD) error { return nil }
+func (m *mockSendServer) SetTrailer(metadata.MD) {}
+func (m *mockSendServer) SendMsg(_ any) error { return nil }
+func (m *mockSendServer) RecvMsg(_ any) error { return nil }
+
+// echoListener is a healthy MessageListener that echoes messages back.
+type echoListener struct {
+ bus.UnImplementedHealthyListener
+}
+
+func (*echoListener) Rev(_ context.Context, msg bus.Message) bus.Message {
return msg }
+
+func newBatchTestServer(m *metrics) *server { //nolint:exhaustruct
+ logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+ if logInitErr != nil {
+ panic(logInitErr)
+ }
+ return &server{
+ log: logger.GetLogger("batch-metrics-test"),
+ metrics: m,
+ listeners: make(map[bus.Topic][]bus.MessageListener),
+ listenersLock: sync.RWMutex{},
+ topicMap: make(map[string]bus.Topic),
+ }
+}
+
+func newBatchMetrics() (m *metrics, msgStarted, msgFinished, batchStarted,
batchFinished *fakeCounter, batchLatency *fakeHistogram) {
+ msgStarted = &fakeCounter{}
+ msgFinished = &fakeCounter{}
+ batchStarted = &fakeCounter{}
+ batchFinished = &fakeCounter{}
+ batchLatency = &fakeHistogram{}
+ m = &metrics{ //nolint:exhaustruct
+ totalStarted: &fakeCounter{},
+ totalFinished: &fakeCounter{},
+ totalLatency: &fakeHistogram{},
+ totalErr: &fakeCounter{},
+ receivedBytes: &fakeCounter{},
+ totalMessageStarted: msgStarted,
+ totalMessageFinished: msgFinished,
+ totalBatchStarted: batchStarted,
+ totalBatchFinished: batchFinished,
+ totalBatchLatency: batchLatency,
+ }
+ return m, msgStarted, msgFinished, batchStarted, batchFinished,
batchLatency
+}
+
+func makeBatchReq(group, senderNode string) *clusterv1.SendRequest {
//nolint:exhaustruct
+ return &clusterv1.SendRequest{
+ Topic: data.TopicMeasureWrite.String(),
+ Group: group,
+ SenderNode: senderNode,
+ SenderRole: "liaison",
+ SenderTier: "hot",
+ BatchMod: true,
+ Body: []byte("data"),
+ }
+}
+
+// TestBatchModNMessageMetrics asserts that for an N-message BatchMod batch:
+// total_message_started==N, total_message_finished==N,
total_batch_started==1, total_batch_finished==1,
+// and total_batch_latency is observed exactly once.
+func TestBatchModNMessageMetrics(t *testing.T) {
+ const N = 3
+ logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+ require.NoError(t, logInitErr)
+
+ m, msgStarted, msgFinished, batchStarted, batchFinished, batchLatency
:= newBatchMetrics()
+ s := newBatchTestServer(m)
+
+ topic := data.TopicMeasureWrite
+ require.NoError(t, s.Subscribe(topic, &echoListener{}))
+
+ stream := newMockSendServer()
+ req := makeBatchReq("g1", "liaison-0:17912")
+
+ identity := &streamIdentity{}
+ var dataCollection []any
+ start := time.Now()
+
+ // Simulate N batch messages (handleBatch called N times with the same
identity).
+ for range N {
+ s.handleBatch(&dataCollection, req, &start, identity)
+ }
+
+ // Pin identity as the real Send path does before handleBatch is called.
+ s.pinIdentity(identity, req, topic)
+
+ // Simulate EOF: handleEOF dispatches the collected batch.
+ s.handleEOF(stream, &topic, dataCollection, req, identity, start)
+
+ require.Equal(t, float64(N), msgStarted.total, "total_message_started
must equal N")
+ require.Equal(t, float64(N), msgFinished.total, "total_message_finished
must equal N")
+ require.Equal(t, float64(1), batchStarted.total, "total_batch_started
must equal 1")
+ require.Equal(t, float64(1), batchFinished.total, "total_batch_finished
must equal 1")
+ require.Equal(t, 1, batchLatency.count, "total_batch_latency must be
observed exactly once")
+}
+
+// TestBatchModSendFailureMetrics verifies that a Send failure on the response
does NOT affect
+// the four metric counts — they are pinned after listener.Rev, independent of
Send outcome.
+func TestBatchModSendFailureMetrics(t *testing.T) {
+ const N = 4
+ logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+ require.NoError(t, logInitErr)
+
+ m, msgStarted, msgFinished, batchStarted, batchFinished, batchLatency
:= newBatchMetrics()
+ s := newBatchTestServer(m)
+
+ topic := data.TopicMeasureWrite
+ require.NoError(t, s.Subscribe(topic, &echoListener{}))
+
+ // Wire a Send that always fails (simulates client disconnect before
reading EOF response).
+ stream := newMockSendServer()
+ stream.sendErr = errors.New("client disconnect")
+
+ req := makeBatchReq("g2", "liaison-1:17912")
+ identity := &streamIdentity{}
+ var dataCollection []any
+ start := time.Now()
+
+ for range N {
+ s.handleBatch(&dataCollection, req, &start, identity)
+ }
+ s.pinIdentity(identity, req, topic)
+ s.handleEOF(stream, &topic, dataCollection, req, identity, start)
+
Review Comment:
Same as above: pin identity before calling `handleBatch` to mirror the
production call order and avoid counting with empty labels if the metrics
implementation enforces label cardinality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]