This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 3378790 Fixed locking between connection_reader and connection (#84) 3378790 is described below commit 3378790a219271acb8ee84b3428ebf8e4d58946c Author: Matteo Merli <mme...@apache.org> AuthorDate: Sat Nov 9 10:11:23 2019 -0800 Fixed locking between connection_reader and connection (#84) --- pulsar/impl_partition_consumer.go | 6 ++---- pulsar/internal/connection.go | 29 +++++++++++++++++------------ pulsar/internal/connection_reader.go | 6 +++--- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go index 58b3ffe..83e09ad 100644 --- a/pulsar/impl_partition_consumer.go +++ b/pulsar/impl_partition_consumer.go @@ -600,11 +600,9 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error { return nil } -func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error { +func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error { pbMsgID := response.GetMessageId() - - reader := internal.NewMessageReader(headersAndPayload) - + reader := internal.NewMessageReader(headersAndPayload.ReadableSlice()) msgMeta, err := reader.ReadMessageMetadata() if err != nil { // TODO send discardCorruptedMessage diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 2e933b8..a68e34a 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -66,7 +66,7 @@ type Connection interface { } type ConsumerHandler interface { - MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error + MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) error // ConnectionClosed close the TCP connection. ConnectionClosed() @@ -107,6 +107,11 @@ type request struct { callback func(command *pb.BaseCommand, err error) } +type incomingCmd struct { + cmd *pb.BaseCommand + headersAndPayload Buffer +} + type connection struct { sync.Mutex cond *sync.Cond @@ -129,9 +134,9 @@ type connection struct { requestIDGenerator uint64 incomingRequestsCh chan *request + incomingCmdCh chan *incomingCmd writeRequestsCh chan []byte - mapMutex sync.RWMutex pendingReqs map[uint64]*request listeners map[uint64]ConnectionListener @@ -156,6 +161,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO auth: auth, incomingRequestsCh: make(chan *request), + incomingCmdCh: make(chan *incomingCmd), writeRequestsCh: make(chan []byte), listeners: make(map[uint64]ConnectionListener), consumerHandlers: make(map[uint64]ConsumerHandler), @@ -280,11 +286,12 @@ func (c *connection) run() { if req == nil { return } - c.mapMutex.Lock() c.pendingReqs[req.id] = req - c.mapMutex.Unlock() c.writeCommand(req.cmd) + case cmd := <- c.incomingCmdCh: + c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload) + case data := <-c.writeRequestsCh: if data == nil { return @@ -331,7 +338,11 @@ func (c *connection) writeCommand(cmd proto.Message) { c.internalWriteData(data) } -func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) { +func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) { + c.incomingCmdCh <- &incomingCmd{cmd, headersAndPayload} +} + +func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) { c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload) c.setLastDataReceived(time.Now()) var err error @@ -406,14 +417,11 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback } func (c *connection) internalSendRequest(req *request) { - c.mapMutex.Lock() c.pendingReqs[req.id] = req - c.mapMutex.Unlock() c.writeCommand(req.cmd) } func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) { - c.mapMutex.RLock() request, ok := c.pendingReqs[requestID] if !ok { c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type) @@ -421,13 +429,11 @@ func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) } delete(c.pendingReqs, requestID) - c.mapMutex.RUnlock() request.callback(response, nil) } func (c *connection) handleResponseError(serverError *pb.CommandError) { requestID := serverError.GetRequestId() - c.mapMutex.RLock() request, ok := c.pendingReqs[requestID] if !ok { c.log.Warnf("Received unexpected error response for request %d of type %s", @@ -436,7 +442,6 @@ func (c *connection) handleResponseError(serverError *pb.CommandError) { } delete(c.pendingReqs, requestID) - c.mapMutex.RUnlock() request.callback(nil, errors.New(fmt.Sprintf("server error: %s: %s", serverError.GetError(), serverError.GetMessage()))) @@ -451,7 +456,7 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) { } } -func (c *connection) handleMessage(response *pb.CommandMessage, payload []byte) error { +func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) error { c.log.Debug("Got Message: ", response) consumerID := response.GetConsumerId() if consumer, ok := c.consumerHandler(consumerID); ok { diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go index 746db5c..c74a940 100644 --- a/pulsar/internal/connection_reader.go +++ b/pulsar/internal/connection_reader.go @@ -55,7 +55,7 @@ func (r *connectionReader) readFromConnection() { } } -func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload []byte, err error) { +func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload Buffer, err error) { // First, we need to read the frame size if r.buffer.ReadableBytes() < 4 { if r.buffer.ReadableBytes() == 0 { @@ -92,8 +92,8 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP // Also read the eventual payload headersAndPayloadSize := frameSize - (cmdSize + 4) if cmdSize+4 < frameSize { - headersAndPayload = make([]byte, headersAndPayloadSize) - copy(headersAndPayload, r.buffer.Read(headersAndPayloadSize)) + headersAndPayload = NewBuffer(int(headersAndPayloadSize)) + headersAndPayload.Write(r.buffer.Read(headersAndPayloadSize)) } return cmd, headersAndPayload, nil }