This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 85d7661 Fix ack timeout cause reconnect (#756) 85d7661 is described below commit 85d76615dea35e24e4699d6dc54be216aeb89499 Author: xiaolong ran <xiaolong...@tencent.com> AuthorDate: Wed Apr 6 11:36:02 2022 +0800 Fix ack timeout cause reconnect (#756) * Fix ack timeout cause reconnect Signed-off-by: xiaolongran <xiaolong...@tencent.com> * fix some logic Signed-off-by: xiaolongran <xiaolong...@tencent.com> --- pulsar/producer_partition.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index d031e9a..525b89c 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -780,22 +780,22 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) if !ok { // if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs. - // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves - // the state discrepancy. - p.log.Warnf("Received ack for %v although the pending queue is empty, closing connection", response.GetMessageId()) - p._getConn().Close() + p.log.Warnf("Got ack %v for timed out msg", response.GetMessageId()) return } if pi.sequenceID < response.GetSequenceId() { - // if we receive a receipt that is not the one expected, the state of the broker and the producer differs. - // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves - // the state discrepancy. + // Ignoring the ack since it's referring to a message that has already timed out. + p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(), + response.GetSequenceId(), pi.sequenceID) + return + } else if pi.sequenceID > response.GetSequenceId() { + // Force connection closing so that messages can be re-transmitted in a new connection p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) p._getConn().Close() return - } else if pi.sequenceID == response.GetSequenceId() { + } else { // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback p.pendingQueue.Poll()