This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new d80a722 fix issue 650,different handle sequence value (#651) d80a722 is described below commit d80a722ac1ab197c7e8649efdeb1e16356cbb3bb Author: baomingyu <baomingy...@163.com> AuthorDate: Thu Nov 4 10:57:21 2021 +0800 fix issue 650,different handle sequence value (#651) * fix issue 650,different handle sequence value * add sequenceId equal check --- pulsar/producer_partition.go | 76 ++++++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b2b9273..d67c0c0 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -784,7 +784,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) return } - if pi.sequenceID != response.GetSequenceId() { + if pi.sequenceID < response.GetSequenceId() { // if we receive a receipt that is not the one expected, the state of the broker and the producer differs. // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves // the state discrepancy. @@ -792,49 +792,49 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) response.GetSequenceId(), pi.sequenceID) p.cnx.Close() return - } - - // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback - p.pendingQueue.Poll() - - now := time.Now().UnixNano() - - // lock the pending item while sending the requests - pi.Lock() - defer pi.Unlock() - p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9) - for idx, i := range pi.sendRequests { - sr := i.(*sendRequest) - if sr.msg != nil { - atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) - p.publishSemaphore.Release() + } else if pi.sequenceID == response.GetSequenceId() { + // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback + p.pendingQueue.Poll() + + now := time.Now().UnixNano() + + // lock the pending item while sending the requests + pi.Lock() + defer pi.Unlock() + p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9) + for idx, i := range pi.sendRequests { + sr := i.(*sendRequest) + if sr.msg != nil { + atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) + p.publishSemaphore.Release() + + p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) + p.metrics.MessagesPublished.Inc() + p.metrics.MessagesPending.Dec() + payloadSize := float64(len(sr.msg.Payload)) + p.metrics.BytesPublished.Add(payloadSize) + p.metrics.BytesPending.Sub(payloadSize) + } - p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) - p.metrics.MessagesPublished.Inc() - p.metrics.MessagesPending.Dec() - payloadSize := float64(len(sr.msg.Payload)) - p.metrics.BytesPublished.Add(payloadSize) - p.metrics.BytesPending.Sub(payloadSize) - } + if sr.callback != nil || len(p.options.Interceptors) > 0 { + msgID := newMessageID( + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + int32(idx), + p.partitionIdx, + ) - if sr.callback != nil || len(p.options.Interceptors) > 0 { - msgID := newMessageID( - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - int32(idx), - p.partitionIdx, - ) + if sr.callback != nil { + sr.callback(msgID, sr.msg, nil) + } - if sr.callback != nil { - sr.callback(msgID, sr.msg, nil) + p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) } - - p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) } - } - // Mark this pending item as done - pi.Complete() + // Mark this pending item as done + pi.Complete() + } } func (p *partitionProducer) internalClose(req *closeProducer) {