This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 791d342 Fix logic of command for sendError (#622) 791d342 is described below commit 791d342a98d0f0b4189913a5ea61547964d095c8 Author: xiaolong ran <r...@apache.org> AuthorDate: Mon Oct 11 11:20:33 2021 +0800 Fix logic of command for sendError (#622) ### Motivation ![image](https://user-images.githubusercontent.com/20965307/135020293-06cb72cc-5ed9-4bc5-a7ba-3909da57a8a6.png) As shown in the figure above, the `ServerError` returned by the broker is `UnknownError` when the client receives it. In fact, we handled the wrong command here. Here we should deal with `CommandSendError` instead of `CommandError`. Correspondingly, we should deal with the `listener` map used to cache the producer instead of the corresponding `pendingRequest` map. --- pulsar/internal/connection.go | 45 +++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 0313e4e..163dcac 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -531,7 +531,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl c.handleResponseError(cmd.GetError()) case pb.BaseCommand_SEND_ERROR: - c.handleSendError(cmd.GetError()) + c.handleSendError(cmd.GetSendError()) case pb.BaseCommand_CLOSE_PRODUCER: c.handleCloseProducer(cmd.GetCloseProducer()) @@ -752,31 +752,29 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge) c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse)) } -func (c *connection) handleSendError(cmdError *pb.CommandError) { - c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage()) +func (c *connection) handleSendError(sendError *pb.CommandSendError) { + c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage()) - requestID := cmdError.GetRequestId() + producerID := sendError.GetProducerId() - switch cmdError.GetError() { + switch sendError.GetError() { case pb.ServerError_NotAllowedError: - request, ok := c.deletePendingRequest(requestID) + _, ok := c.deletePendingProducers(producerID) if !ok { c.log.Warnf("Received unexpected error response for request %d of type %s", - requestID, cmdError.GetError()) + producerID, sendError.GetError()) return } - errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage()) - request.callback(nil, errors.New(errMsg)) + c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage()) case pb.ServerError_TopicTerminatedError: - request, ok := c.deletePendingRequest(requestID) + _, ok := c.deletePendingProducers(producerID) if !ok { - c.log.Warnf("Received unexpected error response for request %d of type %s", - requestID, cmdError.GetError()) + c.log.Warnf("Received unexpected error response for producer %d of type %s", + producerID, sendError.GetError()) return } - errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage()) - request.callback(nil, errors.New(errMsg)) + c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage()) default: // By default, for transient error, let the reconnection logic // to take place and re-establish the produce again @@ -784,6 +782,17 @@ func (c *connection) handleSendError(cmdError *pb.CommandError) { } } +func (c *connection) deletePendingProducers(producerID uint64) (ConnectionListener, bool) { + c.listenersLock.Lock() + producer, ok := c.listeners[producerID] + if ok { + delete(c.listeners, producerID) + } + c.listenersLock.Unlock() + + return producer, ok +} + func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) { consumerID := closeConsumer.GetConsumerId() c.log.Infof("Broker notification of Closed consumer: %d", consumerID) @@ -800,13 +809,7 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer) c.log.Infof("Broker notification of Closed producer: %d", closeProducer.GetProducerId()) producerID := closeProducer.GetProducerId() - c.listenersLock.Lock() - producer, ok := c.listeners[producerID] - if ok { - delete(c.listeners, producerID) - } - c.listenersLock.Unlock() - + producer, ok := c.deletePendingProducers(producerID) // did we find a producer? if ok { producer.ConnectionClosed()