This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 85d7661  Fix ack timeout cause reconnect (#756)
85d7661 is described below

commit 85d76615dea35e24e4699d6dc54be216aeb89499
Author: xiaolong ran <xiaolong...@tencent.com>
AuthorDate: Wed Apr 6 11:36:02 2022 +0800

    Fix ack timeout cause reconnect (#756)
    
    * Fix ack timeout cause reconnect
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
    
    * fix some logic
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
---
 pulsar/producer_partition.go | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d031e9a..525b89c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -780,22 +780,22 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
 
        if !ok {
                // if we receive a receipt although the pending queue is empty, 
the state of the broker and the producer differs.
-               // At that point, it is better to close the connection to the 
broker to reconnect to a broker hopping it solves
-               // the state discrepancy.
-               p.log.Warnf("Received ack for %v although the pending queue is 
empty, closing connection", response.GetMessageId())
-               p._getConn().Close()
+               p.log.Warnf("Got ack %v for timed out msg", 
response.GetMessageId())
                return
        }
 
        if pi.sequenceID < response.GetSequenceId() {
-               // if we receive a receipt that is not the one expected, the 
state of the broker and the producer differs.
-               // At that point, it is better to close the connection to the 
broker to reconnect to a broker hopping it solves
-               // the state discrepancy.
+               // Ignoring the ack since it's referring to a message that has 
already timed out.
+               p.log.Warnf("Received ack for %v on sequenceId %v - expected: 
%v, closing connection", response.GetMessageId(),
+                       response.GetSequenceId(), pi.sequenceID)
+               return
+       } else if pi.sequenceID > response.GetSequenceId() {
+               // Force connection closing so that messages can be 
re-transmitted in a new connection
                p.log.Warnf("Received ack for %v on sequenceId %v - expected: 
%v, closing connection", response.GetMessageId(),
                        response.GetSequenceId(), pi.sequenceID)
                p._getConn().Close()
                return
-       } else if pi.sequenceID == response.GetSequenceId() {
+       } else {
                // The ack was indeed for the expected item in the queue, we 
can remove it and trigger the callback
                p.pendingQueue.Poll()
 

Reply via email to