This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3378790  Fixed locking between connection_reader and connection (#84)
3378790 is described below

commit 3378790a219271acb8ee84b3428ebf8e4d58946c
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sat Nov 9 10:11:23 2019 -0800

    Fixed locking between connection_reader and connection (#84)
---
 pulsar/impl_partition_consumer.go    |  6 ++----
 pulsar/internal/connection.go        | 29 +++++++++++++++++------------
 pulsar/internal/connection_reader.go |  6 +++---
 3 files changed, 22 insertions(+), 19 deletions(-)

diff --git a/pulsar/impl_partition_consumer.go 
b/pulsar/impl_partition_consumer.go
index 58b3ffe..83e09ad 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -600,11 +600,9 @@ func (pc *partitionConsumer) internalFlow(permits uint32) 
error {
        return nil
 }
 
-func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload []byte) error {
+func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload internal.Buffer) error {
        pbMsgID := response.GetMessageId()
-
-       reader := internal.NewMessageReader(headersAndPayload)
-
+       reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
        msgMeta, err := reader.ReadMessageMetadata()
        if err != nil {
                // TODO send discardCorruptedMessage
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 2e933b8..a68e34a 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -66,7 +66,7 @@ type Connection interface {
 }
 
 type ConsumerHandler interface {
-       MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) 
error
+       MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) 
error
 
        // ConnectionClosed close the TCP connection.
        ConnectionClosed()
@@ -107,6 +107,11 @@ type request struct {
        callback func(command *pb.BaseCommand, err error)
 }
 
+type incomingCmd struct {
+       cmd               *pb.BaseCommand
+       headersAndPayload Buffer
+}
+
 type connection struct {
        sync.Mutex
        cond  *sync.Cond
@@ -129,9 +134,9 @@ type connection struct {
        requestIDGenerator uint64
 
        incomingRequestsCh chan *request
+       incomingCmdCh     chan *incomingCmd
        writeRequestsCh    chan []byte
 
-       mapMutex    sync.RWMutex
        pendingReqs map[uint64]*request
        listeners   map[uint64]ConnectionListener
 
@@ -156,6 +161,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr 
*url.URL, tlsOptions *TLSO
                auth:                 auth,
 
                incomingRequestsCh: make(chan *request),
+               incomingCmdCh:      make(chan *incomingCmd),
                writeRequestsCh:    make(chan []byte),
                listeners:          make(map[uint64]ConnectionListener),
                consumerHandlers:   make(map[uint64]ConsumerHandler),
@@ -280,11 +286,12 @@ func (c *connection) run() {
                        if req == nil {
                                return
                        }
-                       c.mapMutex.Lock()
                        c.pendingReqs[req.id] = req
-                       c.mapMutex.Unlock()
                        c.writeCommand(req.cmd)
 
+               case cmd := <- c.incomingCmdCh:
+                       c.internalReceivedCommand(cmd.cmd, 
cmd.headersAndPayload)
+
                case data := <-c.writeRequestsCh:
                        if data == nil {
                                return
@@ -331,7 +338,11 @@ func (c *connection) writeCommand(cmd proto.Message) {
        c.internalWriteData(data)
 }
 
-func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload 
[]byte) {
+func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload 
Buffer) {
+       c.incomingCmdCh <- &incomingCmd{cmd, headersAndPayload}
+}
+
+func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, 
headersAndPayload Buffer) {
        c.log.Debugf("Received command: %s -- payload: %v", cmd, 
headersAndPayload)
        c.setLastDataReceived(time.Now())
        var err error
@@ -406,14 +417,11 @@ func (c *connection) SendRequest(requestID uint64, req 
*pb.BaseCommand, callback
 }
 
 func (c *connection) internalSendRequest(req *request) {
-       c.mapMutex.Lock()
        c.pendingReqs[req.id] = req
-       c.mapMutex.Unlock()
        c.writeCommand(req.cmd)
 }
 
 func (c *connection) handleResponse(requestID uint64, response 
*pb.BaseCommand) {
-       c.mapMutex.RLock()
        request, ok := c.pendingReqs[requestID]
        if !ok {
                c.log.Warnf("Received unexpected response for request %d of 
type %s", requestID, response.Type)
@@ -421,13 +429,11 @@ func (c *connection) handleResponse(requestID uint64, 
response *pb.BaseCommand)
        }
 
        delete(c.pendingReqs, requestID)
-       c.mapMutex.RUnlock()
        request.callback(response, nil)
 }
 
 func (c *connection) handleResponseError(serverError *pb.CommandError) {
        requestID := serverError.GetRequestId()
-       c.mapMutex.RLock()
        request, ok := c.pendingReqs[requestID]
        if !ok {
                c.log.Warnf("Received unexpected error response for request %d 
of type %s",
@@ -436,7 +442,6 @@ func (c *connection) handleResponseError(serverError 
*pb.CommandError) {
        }
 
        delete(c.pendingReqs, requestID)
-       c.mapMutex.RUnlock()
 
        request.callback(nil,
                errors.New(fmt.Sprintf("server error: %s: %s", 
serverError.GetError(), serverError.GetMessage())))
@@ -451,7 +456,7 @@ func (c *connection) handleSendReceipt(response 
*pb.CommandSendReceipt) {
        }
 }
 
-func (c *connection) handleMessage(response *pb.CommandMessage, payload 
[]byte) error {
+func (c *connection) handleMessage(response *pb.CommandMessage, payload 
Buffer) error {
        c.log.Debug("Got Message: ", response)
        consumerID := response.GetConsumerId()
        if consumer, ok := c.consumerHandler(consumerID); ok {
diff --git a/pulsar/internal/connection_reader.go 
b/pulsar/internal/connection_reader.go
index 746db5c..c74a940 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -55,7 +55,7 @@ func (r *connectionReader) readFromConnection() {
        }
 }
 
-func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, 
headersAndPayload []byte, err error) {
+func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, 
headersAndPayload Buffer, err error) {
        // First, we need to read the frame size
        if r.buffer.ReadableBytes() < 4 {
                if r.buffer.ReadableBytes() == 0 {
@@ -92,8 +92,8 @@ func (r *connectionReader) readSingleCommand() (cmd 
*pb.BaseCommand, headersAndP
        // Also read the eventual payload
        headersAndPayloadSize := frameSize - (cmdSize + 4)
        if cmdSize+4 < frameSize {
-               headersAndPayload = make([]byte, headersAndPayloadSize)
-               copy(headersAndPayload, r.buffer.Read(headersAndPayloadSize))
+               headersAndPayload = NewBuffer(int(headersAndPayloadSize))
+               headersAndPayload.Write(r.buffer.Read(headersAndPayloadSize))
        }
        return cmd, headersAndPayload, nil
 }

Reply via email to