Copilot commented on code in PR #1169:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1169#discussion_r3393101159


##########
banyand/queue/pub/batch.go:
##########
@@ -243,9 +264,13 @@ func (bp *batchPublisher) listenBatchResponse(ctx 
context.Context, s clusterv1.S
        if errRecv != nil {
                if bp.hasMetrics() {
                        bp.pub.metrics.totalErr.Inc(1, operation, "", curNode, 
info.role, info.tier, sendErrReasonRecvError)
+                       bp.pub.metrics.totalBatchFinished.Inc(1, operation, "", 
curNode, info.role, info.tier)

Review Comment:
   In the recv-error path, `total_batch_finished`/`total_batch_latency` are 
recorded even though no final `SendResponse` was received. This conflicts with 
the documented semantics in this PR (batch finished/latency observed when the 
stream’s terminal response is received). Consider leaving started without a 
matching finished on `Recv` errors (and relying on 
`total_err{error_type="recv_error"}`), or update the docs/metric name to match 
the current behavior.



##########
banyand/queue/sub/batch_metrics_test.go:
##########
@@ -0,0 +1,296 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sub
+
+import (
+       "context"
+       "errors"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc/metadata"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// mockSendServer implements clusterv1.Service_SendServer for sub batch tests.
+type mockSendServer struct {
+       ctx     context.Context
+       sendErr error
+       sent    []*clusterv1.SendResponse
+       mu      sync.Mutex
+}
+
+func newMockSendServer() *mockSendServer {
+       return &mockSendServer{
+               ctx: metadata.NewIncomingContext(context.Background(), 
metadata.MD{}),
+       }
+}
+
+func (m *mockSendServer) Send(resp *clusterv1.SendResponse) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.sent = append(m.sent, resp)
+       return m.sendErr
+}
+
+func (m *mockSendServer) Recv() (*clusterv1.SendRequest, error) { return nil, 
nil }
+func (m *mockSendServer) Context() context.Context              { return m.ctx 
}
+func (m *mockSendServer) SetHeader(metadata.MD) error           { return nil }
+func (m *mockSendServer) SendHeader(metadata.MD) error          { return nil }
+func (m *mockSendServer) SetTrailer(metadata.MD)                {}
+func (m *mockSendServer) SendMsg(_ any) error                   { return nil }
+func (m *mockSendServer) RecvMsg(_ any) error                   { return nil }
+
+// echoListener is a healthy MessageListener that echoes messages back.
+type echoListener struct {
+       bus.UnImplementedHealthyListener
+}
+
+func (*echoListener) Rev(_ context.Context, msg bus.Message) bus.Message { 
return msg }
+
+func newBatchTestServer(m *metrics) *server { //nolint:exhaustruct
+       logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+       if logInitErr != nil {
+               panic(logInitErr)
+       }
+       return &server{
+               log:           logger.GetLogger("batch-metrics-test"),
+               metrics:       m,
+               listeners:     make(map[bus.Topic][]bus.MessageListener),
+               listenersLock: sync.RWMutex{},
+               topicMap:      make(map[string]bus.Topic),
+       }
+}
+
+func newBatchMetrics() (m *metrics, msgStarted, msgFinished, batchStarted, 
batchFinished *fakeCounter, batchLatency *fakeHistogram) {
+       msgStarted = &fakeCounter{}
+       msgFinished = &fakeCounter{}
+       batchStarted = &fakeCounter{}
+       batchFinished = &fakeCounter{}
+       batchLatency = &fakeHistogram{}
+       m = &metrics{ //nolint:exhaustruct
+               totalStarted:         &fakeCounter{},
+               totalFinished:        &fakeCounter{},
+               totalLatency:         &fakeHistogram{},
+               totalErr:             &fakeCounter{},
+               receivedBytes:        &fakeCounter{},
+               totalMessageStarted:  msgStarted,
+               totalMessageFinished: msgFinished,
+               totalBatchStarted:    batchStarted,
+               totalBatchFinished:   batchFinished,
+               totalBatchLatency:    batchLatency,
+       }
+       return m, msgStarted, msgFinished, batchStarted, batchFinished, 
batchLatency
+}
+
+func makeBatchReq(group, senderNode string) *clusterv1.SendRequest { 
//nolint:exhaustruct
+       return &clusterv1.SendRequest{
+               Topic:      data.TopicMeasureWrite.String(),
+               Group:      group,
+               SenderNode: senderNode,
+               SenderRole: "liaison",
+               SenderTier: "hot",
+               BatchMod:   true,
+               Body:       []byte("data"),
+       }
+}
+
+// TestBatchModNMessageMetrics asserts that for an N-message BatchMod batch:
+// total_message_started==N, total_message_finished==N, 
total_batch_started==1, total_batch_finished==1,
+// and total_batch_latency is observed exactly once.
+func TestBatchModNMessageMetrics(t *testing.T) {
+       const N = 3
+       logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+       require.NoError(t, logInitErr)
+
+       m, msgStarted, msgFinished, batchStarted, batchFinished, batchLatency 
:= newBatchMetrics()
+       s := newBatchTestServer(m)
+
+       topic := data.TopicMeasureWrite
+       require.NoError(t, s.Subscribe(topic, &echoListener{}))
+
+       stream := newMockSendServer()
+       req := makeBatchReq("g1", "liaison-0:17912")
+
+       identity := &streamIdentity{}
+       var dataCollection []any
+       start := time.Now()
+
+       // Simulate N batch messages (handleBatch called N times with the same 
identity).
+       for range N {
+               s.handleBatch(&dataCollection, req, &start, identity)
+       }
+
+       // Pin identity as the real Send path does before handleBatch is called.
+       s.pinIdentity(identity, req, topic)
+

Review Comment:
   This test calls `handleBatch` before `pinIdentity`, but in the real `Send` 
path identity is pinned before metrics are ticked. Keeping the same ordering in 
the test helps prevent future regressions (e.g., ticking counters with empty 
label values).



##########
banyand/queue/sub/batch_metrics_test.go:
##########
@@ -0,0 +1,296 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sub
+
+import (
+       "context"
+       "errors"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc/metadata"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// mockSendServer implements clusterv1.Service_SendServer for sub batch tests.
+type mockSendServer struct {
+       ctx     context.Context
+       sendErr error
+       sent    []*clusterv1.SendResponse
+       mu      sync.Mutex
+}
+
+func newMockSendServer() *mockSendServer {
+       return &mockSendServer{
+               ctx: metadata.NewIncomingContext(context.Background(), 
metadata.MD{}),
+       }
+}
+
+func (m *mockSendServer) Send(resp *clusterv1.SendResponse) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.sent = append(m.sent, resp)
+       return m.sendErr
+}
+
+func (m *mockSendServer) Recv() (*clusterv1.SendRequest, error) { return nil, 
nil }
+func (m *mockSendServer) Context() context.Context              { return m.ctx 
}
+func (m *mockSendServer) SetHeader(metadata.MD) error           { return nil }
+func (m *mockSendServer) SendHeader(metadata.MD) error          { return nil }
+func (m *mockSendServer) SetTrailer(metadata.MD)                {}
+func (m *mockSendServer) SendMsg(_ any) error                   { return nil }
+func (m *mockSendServer) RecvMsg(_ any) error                   { return nil }
+
+// echoListener is a healthy MessageListener that echoes messages back.
+type echoListener struct {
+       bus.UnImplementedHealthyListener
+}
+
+func (*echoListener) Rev(_ context.Context, msg bus.Message) bus.Message { 
return msg }
+
+func newBatchTestServer(m *metrics) *server { //nolint:exhaustruct
+       logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+       if logInitErr != nil {
+               panic(logInitErr)
+       }
+       return &server{
+               log:           logger.GetLogger("batch-metrics-test"),
+               metrics:       m,
+               listeners:     make(map[bus.Topic][]bus.MessageListener),
+               listenersLock: sync.RWMutex{},
+               topicMap:      make(map[string]bus.Topic),
+       }
+}
+
+func newBatchMetrics() (m *metrics, msgStarted, msgFinished, batchStarted, 
batchFinished *fakeCounter, batchLatency *fakeHistogram) {
+       msgStarted = &fakeCounter{}
+       msgFinished = &fakeCounter{}
+       batchStarted = &fakeCounter{}
+       batchFinished = &fakeCounter{}
+       batchLatency = &fakeHistogram{}
+       m = &metrics{ //nolint:exhaustruct
+               totalStarted:         &fakeCounter{},
+               totalFinished:        &fakeCounter{},
+               totalLatency:         &fakeHistogram{},
+               totalErr:             &fakeCounter{},
+               receivedBytes:        &fakeCounter{},
+               totalMessageStarted:  msgStarted,
+               totalMessageFinished: msgFinished,
+               totalBatchStarted:    batchStarted,
+               totalBatchFinished:   batchFinished,
+               totalBatchLatency:    batchLatency,
+       }
+       return m, msgStarted, msgFinished, batchStarted, batchFinished, 
batchLatency
+}
+
+func makeBatchReq(group, senderNode string) *clusterv1.SendRequest { 
//nolint:exhaustruct
+       return &clusterv1.SendRequest{
+               Topic:      data.TopicMeasureWrite.String(),
+               Group:      group,
+               SenderNode: senderNode,
+               SenderRole: "liaison",
+               SenderTier: "hot",
+               BatchMod:   true,
+               Body:       []byte("data"),
+       }
+}
+
+// TestBatchModNMessageMetrics asserts that for an N-message BatchMod batch:
+// total_message_started==N, total_message_finished==N, 
total_batch_started==1, total_batch_finished==1,
+// and total_batch_latency is observed exactly once.
+func TestBatchModNMessageMetrics(t *testing.T) {
+       const N = 3
+       logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+       require.NoError(t, logInitErr)
+
+       m, msgStarted, msgFinished, batchStarted, batchFinished, batchLatency 
:= newBatchMetrics()
+       s := newBatchTestServer(m)
+
+       topic := data.TopicMeasureWrite
+       require.NoError(t, s.Subscribe(topic, &echoListener{}))
+
+       stream := newMockSendServer()
+       req := makeBatchReq("g1", "liaison-0:17912")
+
+       identity := &streamIdentity{}
+       var dataCollection []any
+       start := time.Now()
+
+       // Simulate N batch messages (handleBatch called N times with the same 
identity).
+       for range N {
+               s.handleBatch(&dataCollection, req, &start, identity)
+       }
+
+       // Pin identity as the real Send path does before handleBatch is called.
+       s.pinIdentity(identity, req, topic)
+
+       // Simulate EOF: handleEOF dispatches the collected batch.
+       s.handleEOF(stream, &topic, dataCollection, req, identity, start)
+
+       require.Equal(t, float64(N), msgStarted.total, "total_message_started 
must equal N")
+       require.Equal(t, float64(N), msgFinished.total, "total_message_finished 
must equal N")
+       require.Equal(t, float64(1), batchStarted.total, "total_batch_started 
must equal 1")
+       require.Equal(t, float64(1), batchFinished.total, "total_batch_finished 
must equal 1")
+       require.Equal(t, 1, batchLatency.count, "total_batch_latency must be 
observed exactly once")
+}
+
+// TestBatchModSendFailureMetrics verifies that a Send failure on the response 
does NOT affect
+// the four metric counts — they are pinned after listener.Rev, independent of 
Send outcome.
+func TestBatchModSendFailureMetrics(t *testing.T) {
+       const N = 4
+       logInitErr := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+       require.NoError(t, logInitErr)
+
+       m, msgStarted, msgFinished, batchStarted, batchFinished, batchLatency 
:= newBatchMetrics()
+       s := newBatchTestServer(m)
+
+       topic := data.TopicMeasureWrite
+       require.NoError(t, s.Subscribe(topic, &echoListener{}))
+
+       // Wire a Send that always fails (simulates client disconnect before 
reading EOF response).
+       stream := newMockSendServer()
+       stream.sendErr = errors.New("client disconnect")
+
+       req := makeBatchReq("g2", "liaison-1:17912")
+       identity := &streamIdentity{}
+       var dataCollection []any
+       start := time.Now()
+
+       for range N {
+               s.handleBatch(&dataCollection, req, &start, identity)
+       }
+       s.pinIdentity(identity, req, topic)
+       s.handleEOF(stream, &topic, dataCollection, req, identity, start)
+

Review Comment:
   Same as above: pin identity before calling `handleBatch` to mirror the 
production call order and avoid counting with empty labels if the metrics 
implementation enforces label cardinality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to