Copilot commented on code in PR #1161:
URL:
https://github.com/apache/skywalking-banyandb/pull/1161#discussion_r3366676566
##########
banyand/queue/sub/sub.go:
##########
@@ -78,37 +73,49 @@ func checkVersionCompatibility(versionInfo
*clusterv1.VersionInfo) (*clusterv1.V
switch {
case !apiCompatible && !fileFormatCompatible:
- versionCompatibility.Supported = false
- versionCompatibility.Reason = fmt.Sprintf("API version %s not
supported (server: %s) and file format version %s not compatible (server: %s,
supported: %v)",
+ vc.Supported = false
+ vc.Reason = fmt.Sprintf("API version %s not supported (server:
%s) and file format version %s not compatible (server: %s, supported: %v)",
versionInfo.ApiVersion, serverAPIVersion,
versionInfo.FileFormatVersion, serverFileFormatVersion,
compatibleFileFormatVersions)
- return versionCompatibility,
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+ return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
case !apiCompatible:
- versionCompatibility.Supported = false
- versionCompatibility.Reason = fmt.Sprintf("API version %s not
supported (server: %s)", versionInfo.ApiVersion, serverAPIVersion)
- return versionCompatibility,
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+ vc.Supported = false
+ vc.Reason = fmt.Sprintf("API version %s not supported (server:
%s)", versionInfo.ApiVersion, serverAPIVersion)
+ return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
case !fileFormatCompatible:
- versionCompatibility.Supported = false
- versionCompatibility.Reason = fmt.Sprintf("File format version
%s not compatible (server: %s, supported: %v)",
+ vc.Supported = false
+ vc.Reason = fmt.Sprintf("File format version %s not compatible
(server: %s, supported: %v)",
versionInfo.FileFormatVersion, serverFileFormatVersion,
compatibleFileFormatVersions)
- return versionCompatibility,
modelv1.Status_STATUS_VERSION_UNSUPPORTED
+ return vc, modelv1.Status_STATUS_VERSION_UNSUPPORTED
}
- versionCompatibility.Supported = true
- versionCompatibility.Reason = "Client version compatible with server"
- return versionCompatibility, modelv1.Status_STATUS_SUCCEED
+ vc.Supported = true
+ vc.Reason = "Client version compatible with server"
+ return vc, modelv1.Status_STATUS_SUCCEED
+}
+
+// streamIdentity captures the per-stream remote sender identity, pinned on
the first message.
+type streamIdentity struct {
+ senderNode string
+ senderRole string
+ senderTier string
+ group string
+ operation string
+ pinned bool
}
func (s *server) Send(stream clusterv1.Service_SendServer) error {
ctx := stream.Context()
var topic *bus.Topic
- var m bus.Message
var dataCollection []any
start := time.Now()
+ identity := &streamIdentity{}
+
defer func() {
- if topic != nil {
- s.metrics.totalFinished.Inc(1, topic.String())
- s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
topic.String())
+ if topic == nil || !identity.pinned {
+ return
}
+ s.metrics.totalFinished.Inc(1, identity.operation,
identity.group, identity.senderNode, identity.senderRole, identity.senderTier)
+ s.metrics.totalLatency.Observe(time.Since(start).Seconds(),
identity.operation, identity.group, identity.senderNode, identity.senderRole,
identity.senderTier)
Review Comment:
The Send() defer records metrics via s.metrics without checking whether
metrics are initialized. Tests (and potentially other call sites) can construct
a server without running PreRun(), leaving s.metrics nil, which would panic
here.
##########
banyand/queue/sub/helpers.go:
##########
@@ -29,28 +29,28 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
)
-func (s *server) handleEOF(stream clusterv1.Service_SendServer, topic
*bus.Topic, dataCollection []any, writeEntity *clusterv1.SendRequest) {
+func (s *server) handleEOF(stream clusterv1.Service_SendServer, topic
*bus.Topic, dataCollection []any, writeEntity *clusterv1.SendRequest, identity
*streamIdentity) {
if len(dataCollection) < 1 {
return
}
listeners := s.getListeners(*topic)
if len(listeners) == 0 {
- s.reply(stream, writeEntity, nil, "no listener found")
+ s.replyWithErrType(stream, writeEntity, nil, "no listener
found", identity, "no_listener")
return
}
if len(listeners) > 1 {
logger.Panicf("multiple listeners found for topic %s", *topic)
}
listener := listeners[0]
if le := listener.CheckHealth(); le != nil {
- s.reply(stream, writeEntity, le, "")
+ s.replyWithErrType(stream, writeEntity, le, "", identity,
"handler_error")
return
}
message := listener.Rev(stream.Context(),
bus.NewMessage(bus.MessageID(0), dataCollection))
var resp *clusterv1.SendResponse
- data := message.Data()
- if data != nil {
- switch d := data.(type) {
+ msgData := message.Data()
+ if msgData != nil {
+ switch d := msgData.(type) {
case *common.Error:
resp = &clusterv1.SendResponse{
MessageId: writeEntity.MessageId,
Review Comment:
handleEOF can send a nil *SendResponse when listener.Rev returns a message
with nil data (resp remains nil), and it also dereferences
writeEntity.MessageId even though writeEntity can be nil when Recv() returns
io.EOF. Both can lead to panics during batch flush on stream close.
##########
banyand/queue/pub/batch.go:
##########
@@ -50,13 +50,12 @@ const (
// Remote-side failures (observed after the frame was written to the
stream).
sendErrReasonRecvError = "recv_error" // s.Recv returned an
error (connection/protocol layer).
sendErrReasonServerRejected = "server_rejected" // Server responded
with a non-empty Error (includes failover statuses).
-
- sendResultSuccess = "success"
)
type writeStream struct {
- client clusterv1.Service_SendClient
- ctxDoneCh <-chan struct{}
+ client clusterv1.Service_SendClient
+ ctxDoneCh <-chan struct{}
+ isFirstSent bool // true until the first successful send on this stream
}
Review Comment:
The isFirstSent field comment is backwards: the zero value is false, and the
code treats false as "first frame not yet sent". As written, the comment says
the opposite, which is misleading when maintaining the sender_* stamping logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]