This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d80a722  fix issue 650,different handle sequence value (#651)
d80a722 is described below

commit d80a722ac1ab197c7e8649efdeb1e16356cbb3bb
Author: baomingyu <baomingy...@163.com>
AuthorDate: Thu Nov 4 10:57:21 2021 +0800

    fix issue 650,different handle sequence value (#651)
    
    * fix issue 650,different handle sequence value
    
    * add sequenceId equal check
---
 pulsar/producer_partition.go | 76 ++++++++++++++++++++++----------------------
 1 file changed, 38 insertions(+), 38 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b2b9273..d67c0c0 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -784,7 +784,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
                return
        }
 
-       if pi.sequenceID != response.GetSequenceId() {
+       if pi.sequenceID < response.GetSequenceId() {
                // if we receive a receipt that is not the one expected, the 
state of the broker and the producer differs.
                // At that point, it is better to close the connection to the 
broker to reconnect to a broker hopping it solves
                // the state discrepancy.
@@ -792,49 +792,49 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
                        response.GetSequenceId(), pi.sequenceID)
                p.cnx.Close()
                return
-       }
-
-       // The ack was indeed for the expected item in the queue, we can remove 
it and trigger the callback
-       p.pendingQueue.Poll()
-
-       now := time.Now().UnixNano()
-
-       // lock the pending item while sending the requests
-       pi.Lock()
-       defer pi.Unlock()
-       p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 
1.0e9)
-       for idx, i := range pi.sendRequests {
-               sr := i.(*sendRequest)
-               if sr.msg != nil {
-                       atomic.StoreInt64(&p.lastSequenceID, 
int64(pi.sequenceID))
-                       p.publishSemaphore.Release()
+       } else if pi.sequenceID == response.GetSequenceId() {
+               // The ack was indeed for the expected item in the queue, we 
can remove it and trigger the callback
+               p.pendingQueue.Poll()
+
+               now := time.Now().UnixNano()
+
+               // lock the pending item while sending the requests
+               pi.Lock()
+               defer pi.Unlock()
+               
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
+               for idx, i := range pi.sendRequests {
+                       sr := i.(*sendRequest)
+                       if sr.msg != nil {
+                               atomic.StoreInt64(&p.lastSequenceID, 
int64(pi.sequenceID))
+                               p.publishSemaphore.Release()
+
+                               
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
+                               p.metrics.MessagesPublished.Inc()
+                               p.metrics.MessagesPending.Dec()
+                               payloadSize := float64(len(sr.msg.Payload))
+                               p.metrics.BytesPublished.Add(payloadSize)
+                               p.metrics.BytesPending.Sub(payloadSize)
+                       }
 
-                       
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
-                       p.metrics.MessagesPublished.Inc()
-                       p.metrics.MessagesPending.Dec()
-                       payloadSize := float64(len(sr.msg.Payload))
-                       p.metrics.BytesPublished.Add(payloadSize)
-                       p.metrics.BytesPending.Sub(payloadSize)
-               }
+                       if sr.callback != nil || len(p.options.Interceptors) > 
0 {
+                               msgID := newMessageID(
+                                       int64(response.MessageId.GetLedgerId()),
+                                       int64(response.MessageId.GetEntryId()),
+                                       int32(idx),
+                                       p.partitionIdx,
+                               )
 
-               if sr.callback != nil || len(p.options.Interceptors) > 0 {
-                       msgID := newMessageID(
-                               int64(response.MessageId.GetLedgerId()),
-                               int64(response.MessageId.GetEntryId()),
-                               int32(idx),
-                               p.partitionIdx,
-                       )
+                               if sr.callback != nil {
+                                       sr.callback(msgID, sr.msg, nil)
+                               }
 
-                       if sr.callback != nil {
-                               sr.callback(msgID, sr.msg, nil)
+                               p.options.Interceptors.OnSendAcknowledgement(p, 
sr.msg, msgID)
                        }
-
-                       p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, 
msgID)
                }
-       }
 
-       // Mark this pending item as done
-       pi.Complete()
+               // Mark this pending item as done
+               pi.Complete()
+       }
 }
 
 func (p *partitionProducer) internalClose(req *closeProducer) {

Reply via email to