This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 16e8b81  [Bugfix] producer runEventsLoop for reconnect early exit 
(#721)
16e8b81 is described below

commit 16e8b8114615146d645f2947afeb5f3cedbc85a8
Author: billowqiu <billow...@163.com>
AuthorDate: Wed Feb 9 15:04:26 2022 +0800

    [Bugfix] producer runEventsLoop for reconnect early exit (#721)
    
    * Fix closed connection leak
    
    * Fix closed connection leak
    
    * bugfix: runEventsLoop for reconnect early exit
    
    * [optimize] add log when reconnect
    
    * [Bugfix]fix panic
    
    * [Bugfix]remove log conn ID
    
    * [optimize]Distinguish failed create producer log
---
 pulsar/consumer_partition.go       |  3 +++
 pulsar/internal/connection_pool.go |  6 +++---
 pulsar/producer_partition.go       | 21 +++++++++++++--------
 3 files changed, 19 insertions(+), 11 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9bd4a94..04a39c5 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -894,6 +894,7 @@ func (pc *partitionConsumer) runEventsLoop() {
                for {
                        select {
                        case <-pc.closeCh:
+                               pc.log.Info("close consumer, exit reconnect")
                                return
                        case <-pc.connectClosedCh:
                                pc.log.Debug("runEventsLoop will reconnect")
@@ -992,6 +993,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
        for maxRetry != 0 {
                if pc.getConsumerState() != consumerReady {
                        // Consumer is already closing
+                       pc.log.Info("consumer state not ready, exit reconnect")
                        return
                }
 
@@ -1005,6 +1007,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
                        pc.log.Info("Reconnected consumer to broker")
                        return
                }
+               pc.log.WithError(err).Error("Failed to create consumer at 
reconnect")
                errMsg := err.Error()
                if strings.Contains(errMsg, errTopicNotFount) {
                        // when topic is deleted, we should give up 
reconnection.
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index 4787ba1..db67c25 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -75,15 +75,15 @@ func (p *connectionPool) GetConnection(logicalAddr 
*url.URL, physicalAddr *url.U
        p.Lock()
        conn, ok := p.connections[key]
        if ok {
-               p.log.Infof("Found connection in pool key=%s logical_addr=%+v 
physical_addr=%+v",
+               p.log.Debugf("Found connection in pool key=%s logical_addr=%+v 
physical_addr=%+v",
                        key, conn.logicalAddr, conn.physicalAddr)
 
                // remove stale/failed connection
                if conn.closed() {
-                       delete(p.connections, key)
-                       conn.Close()
                        p.log.Infof("Removed connection from pool key=%s 
logical_addr=%+v physical_addr=%+v",
                                key, conn.logicalAddr, conn.physicalAddr)
+                       delete(p.connections, key)
+                       conn.Close()
                        conn = nil // set to nil so we create a new one
                }
        }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 3f1e54b..913c33c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -142,7 +142,7 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
        }
        err := p.grabCnx()
        if err != nil {
-               logger.WithError(err).Error("Failed to create producer")
+               logger.WithError(err).Error("Failed to create producer at 
newPartitionProducer")
                return nil, err
        }
 
@@ -209,7 +209,7 @@ func (p *partitionProducer) grabCnx() error {
        }
        res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, 
id, pb.BaseCommand_PRODUCER, cmdProducer)
        if err != nil {
-               p.log.WithError(err).Error("Failed to create producer")
+               p.log.WithError(err).Error("Failed to create producer at send 
PRODUCER request")
                return err
        }
 
@@ -324,6 +324,7 @@ func (p *partitionProducer) reconnectToBroker() {
        for maxRetry != 0 {
                if p.getProducerState() != producerReady {
                        // Producer is already closing
+                       p.log.Info("producer state not ready, exit reconnect")
                        return
                }
 
@@ -337,6 +338,7 @@ func (p *partitionProducer) reconnectToBroker() {
                        p.log.WithField("cnx", 
p._getConn().ID()).Info("Reconnected producer to broker")
                        return
                }
+               p.log.WithError(err).Error("Failed to create producer at 
reconnect")
                errMsg := err.Error()
                if strings.Contains(errMsg, errTopicNotFount) {
                        // when topic is deleted, we should give up 
reconnection.
@@ -352,12 +354,15 @@ func (p *partitionProducer) reconnectToBroker() {
 
 func (p *partitionProducer) runEventsLoop() {
        go func() {
-               select {
-               case <-p.closeCh:
-                       return
-               case <-p.connectClosedCh:
-                       p.log.Info("runEventsLoop will reconnect in producer")
-                       p.reconnectToBroker()
+               for {
+                       select {
+                       case <-p.closeCh:
+                               p.log.Info("close producer, exit reconnect")
+                               return
+                       case <-p.connectClosedCh:
+                               p.log.Info("runEventsLoop will reconnect in 
producer")
+                               p.reconnectToBroker()
+                       }
                }
        }()
 

Reply via email to