Copilot commented on code in PR #1161:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1161#discussion_r3366676566


##########
banyand/queue/sub/sub.go:
##########
@@ -78,37 +73,49 @@ func checkVersionCompatibility(versionInfo 
*clusterv1.VersionInfo) (*clusterv1.V
 
        switch {
        case !apiCompatible && !fileFormatCompatible:
-               versionCompatibility.Supported = false
-               versionCompatibility.Reason = fmt.Sprintf("API version %s not 
supported (server: %s) and file format version %s not compatible (server: %s, 
supported: %v)",
+               vc.Supported = false
+               vc.Reason = fmt.Sprintf("API version %s not supported (server: 
%s) and file format version %s not compatible (server: %s, supported: %v)",
                        versionInfo.ApiVersion, serverAPIVersion, 
versionInfo.FileFormatVersion, serverFileFormatVersion, 
compatibleFileFormatVersions)
-               return versionCompatibility, 
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+               return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
        case !apiCompatible:
-               versionCompatibility.Supported = false
-               versionCompatibility.Reason = fmt.Sprintf("API version %s not 
supported (server: %s)", versionInfo.ApiVersion, serverAPIVersion)
-               return versionCompatibility, 
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+               vc.Supported = false
+               vc.Reason = fmt.Sprintf("API version %s not supported (server: 
%s)", versionInfo.ApiVersion, serverAPIVersion)
+               return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
        case !fileFormatCompatible:
-               versionCompatibility.Supported = false
-               versionCompatibility.Reason = fmt.Sprintf("File format version 
%s not compatible (server: %s, supported: %v)",
+               vc.Supported = false
+               vc.Reason = fmt.Sprintf("File format version %s not compatible 
(server: %s, supported: %v)",
                        versionInfo.FileFormatVersion, serverFileFormatVersion, 
compatibleFileFormatVersions)
-               return versionCompatibility, 
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+               return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
        }
 
-       versionCompatibility.Supported = true
-       versionCompatibility.Reason = "Client version compatible with server"
-       return versionCompatibility, modelv1.Status_STATUS_SUCCEED
+       vc.Supported = true
+       vc.Reason = "Client version compatible with server"
+       return vc, modelv1.Status_STATUS_SUCCEED
+}
+
+// streamIdentity captures the per-stream remote sender identity, pinned on 
the first message.
+type streamIdentity struct {
+       senderNode string
+       senderRole string
+       senderTier string
+       group      string
+       operation  string
+       pinned     bool
 }
 
 func (s *server) Send(stream clusterv1.Service_SendServer) error {
        ctx := stream.Context()
        var topic *bus.Topic
-       var m bus.Message
        var dataCollection []any
        start := time.Now()
+       identity := &streamIdentity{}
+
        defer func() {
-               if topic != nil {
-                       s.metrics.totalFinished.Inc(1, topic.String())
-                       s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
topic.String())
+               if topic == nil || !identity.pinned {
+                       return
                }
+               s.metrics.totalFinished.Inc(1, identity.operation, 
identity.group, identity.senderNode, identity.senderRole, identity.senderTier)
+               s.metrics.totalLatency.Observe(time.Since(start).Seconds(), 
identity.operation, identity.group, identity.senderNode, identity.senderRole, 
identity.senderTier)

Review Comment:
   The Send() defer records metrics via s.metrics without checking whether 
metrics are initialized. Tests (and potentially other call sites) can construct 
a server without running PreRun(), leaving s.metrics nil, which would panic 
here.



##########
banyand/queue/sub/helpers.go:
##########
@@ -29,28 +29,28 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-func (s *server) handleEOF(stream clusterv1.Service_SendServer, topic 
*bus.Topic, dataCollection []any, writeEntity *clusterv1.SendRequest) {
+func (s *server) handleEOF(stream clusterv1.Service_SendServer, topic 
*bus.Topic, dataCollection []any, writeEntity *clusterv1.SendRequest, identity 
*streamIdentity) {
        if len(dataCollection) < 1 {
                return
        }
        listeners := s.getListeners(*topic)
        if len(listeners) == 0 {
-               s.reply(stream, writeEntity, nil, "no listener found")
+               s.replyWithErrType(stream, writeEntity, nil, "no listener 
found", identity, "no_listener")
                return
        }
        if len(listeners) > 1 {
                logger.Panicf("multiple listeners found for topic %s", *topic)
        }
        listener := listeners[0]
        if le := listener.CheckHealth(); le != nil {
-               s.reply(stream, writeEntity, le, "")
+               s.replyWithErrType(stream, writeEntity, le, "", identity, 
"handler_error")
                return
        }
        message := listener.Rev(stream.Context(), 
bus.NewMessage(bus.MessageID(0), dataCollection))
        var resp *clusterv1.SendResponse
-       data := message.Data()
-       if data != nil {
-               switch d := data.(type) {
+       msgData := message.Data()
+       if msgData != nil {
+               switch d := msgData.(type) {
                case *common.Error:
                        resp = &clusterv1.SendResponse{
                                MessageId: writeEntity.MessageId,

Review Comment:
   handleEOF can send a nil *SendResponse when listener.Rev returns a message 
with nil data (resp remains nil), and it also dereferences 
writeEntity.MessageId even though writeEntity can be nil when Recv() returns 
io.EOF. Both can lead to panics during batch flush on stream close.



##########
banyand/queue/pub/batch.go:
##########
@@ -50,13 +50,12 @@ const (
        // Remote-side failures (observed after the frame was written to the 
stream).
        sendErrReasonRecvError      = "recv_error"      // s.Recv returned an 
error (connection/protocol layer).
        sendErrReasonServerRejected = "server_rejected" // Server responded 
with a non-empty Error (includes failover statuses).
-
-       sendResultSuccess = "success"
 )
 
 type writeStream struct {
-       client    clusterv1.Service_SendClient
-       ctxDoneCh <-chan struct{}
+       client      clusterv1.Service_SendClient
+       ctxDoneCh   <-chan struct{}
+       isFirstSent bool // true until the first successful send on this stream
 }

Review Comment:
   The isFirstSent field comment is backwards: the zero value is false, and the 
code treats false as "first frame not yet sent". As written, the comment says 
the opposite, which is misleading when maintaining the sender_* stamping logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to