This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 791d342   Fix logic of command for sendError (#622)
791d342 is described below

commit 791d342a98d0f0b4189913a5ea61547964d095c8
Author: xiaolong ran <r...@apache.org>
AuthorDate: Mon Oct 11 11:20:33 2021 +0800

     Fix logic of command for sendError (#622)
    
    ### Motivation
    
    
    
![image](https://user-images.githubusercontent.com/20965307/135020293-06cb72cc-5ed9-4bc5-a7ba-3909da57a8a6.png)
    
    
    As shown in the figure above, the `ServerError` returned by the broker is 
`UnknownError` when the client receives it. In fact, we handled the wrong 
command here. Here we should deal with `CommandSendError` instead of 
`CommandError`. Correspondingly, we should deal with the `listener` map used to 
cache the producer instead of the corresponding `pendingRequest` map.
---
 pulsar/internal/connection.go | 45 +++++++++++++++++++++++--------------------
 1 file changed, 24 insertions(+), 21 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 0313e4e..163dcac 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -531,7 +531,7 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
                c.handleResponseError(cmd.GetError())
 
        case pb.BaseCommand_SEND_ERROR:
-               c.handleSendError(cmd.GetError())
+               c.handleSendError(cmd.GetSendError())
 
        case pb.BaseCommand_CLOSE_PRODUCER:
                c.handleCloseProducer(cmd.GetCloseProducer())
@@ -752,31 +752,29 @@ func (c *connection) handleAuthChallenge(authChallenge 
*pb.CommandAuthChallenge)
        c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, 
cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-       c.log.Warnf("Received send error from server: [%v] : [%s]", 
cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+       c.log.Warnf("Received send error from server: [%v] : [%s]", 
sendError.GetError(), sendError.GetMessage())
 
-       requestID := cmdError.GetRequestId()
+       producerID := sendError.GetProducerId()
 
-       switch cmdError.GetError() {
+       switch sendError.GetError() {
        case pb.ServerError_NotAllowedError:
-               request, ok := c.deletePendingRequest(requestID)
+               _, ok := c.deletePendingProducers(producerID)
                if !ok {
                        c.log.Warnf("Received unexpected error response for 
request %d of type %s",
-                               requestID, cmdError.GetError())
+                               producerID, sendError.GetError())
                        return
                }
 
-               errMsg := fmt.Sprintf("server error: %s: %s", 
cmdError.GetError(), cmdError.GetMessage())
-               request.callback(nil, errors.New(errMsg))
+               c.log.Warnf("server error: %s: %s", sendError.GetError(), 
sendError.GetMessage())
        case pb.ServerError_TopicTerminatedError:
-               request, ok := c.deletePendingRequest(requestID)
+               _, ok := c.deletePendingProducers(producerID)
                if !ok {
-                       c.log.Warnf("Received unexpected error response for 
request %d of type %s",
-                               requestID, cmdError.GetError())
+                       c.log.Warnf("Received unexpected error response for 
producer %d of type %s",
+                               producerID, sendError.GetError())
                        return
                }
-               errMsg := fmt.Sprintf("server error: %s: %s", 
cmdError.GetError(), cmdError.GetMessage())
-               request.callback(nil, errors.New(errMsg))
+               c.log.Warnf("server error: %s: %s", sendError.GetError(), 
sendError.GetMessage())
        default:
                // By default, for transient error, let the reconnection logic
                // to take place and re-establish the produce again
@@ -784,6 +782,17 @@ func (c *connection) handleSendError(cmdError 
*pb.CommandError) {
        }
 }
 
+func (c *connection) deletePendingProducers(producerID uint64) 
(ConnectionListener, bool) {
+       c.listenersLock.Lock()
+       producer, ok := c.listeners[producerID]
+       if ok {
+               delete(c.listeners, producerID)
+       }
+       c.listenersLock.Unlock()
+
+       return producer, ok
+}
+
 func (c *connection) handleCloseConsumer(closeConsumer 
*pb.CommandCloseConsumer) {
        consumerID := closeConsumer.GetConsumerId()
        c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
@@ -800,13 +809,7 @@ func (c *connection) handleCloseProducer(closeProducer 
*pb.CommandCloseProducer)
        c.log.Infof("Broker notification of Closed producer: %d", 
closeProducer.GetProducerId())
        producerID := closeProducer.GetProducerId()
 
-       c.listenersLock.Lock()
-       producer, ok := c.listeners[producerID]
-       if ok {
-               delete(c.listeners, producerID)
-       }
-       c.listenersLock.Unlock()
-
+       producer, ok := c.deletePendingProducers(producerID)
        // did we find a producer?
        if ok {
                producer.ConnectionClosed()

Reply via email to