This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 16e8b81 [Bugfix] producer runEventsLoop for reconnect early exit (#721) 16e8b81 is described below commit 16e8b8114615146d645f2947afeb5f3cedbc85a8 Author: billowqiu <billow...@163.com> AuthorDate: Wed Feb 9 15:04:26 2022 +0800 [Bugfix] producer runEventsLoop for reconnect early exit (#721) * Fix closed connection leak * Fix closed connection leak * bugfix: runEventsLoop for reconnect early exit * [optimize] add log when reconnect * [Bugfix]fix panic * [Bugfix]remove log conn ID * [optimize]Distinguish failed create producer log --- pulsar/consumer_partition.go | 3 +++ pulsar/internal/connection_pool.go | 6 +++--- pulsar/producer_partition.go | 21 +++++++++++++-------- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 9bd4a94..04a39c5 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -894,6 +894,7 @@ func (pc *partitionConsumer) runEventsLoop() { for { select { case <-pc.closeCh: + pc.log.Info("close consumer, exit reconnect") return case <-pc.connectClosedCh: pc.log.Debug("runEventsLoop will reconnect") @@ -992,6 +993,7 @@ func (pc *partitionConsumer) reconnectToBroker() { for maxRetry != 0 { if pc.getConsumerState() != consumerReady { // Consumer is already closing + pc.log.Info("consumer state not ready, exit reconnect") return } @@ -1005,6 +1007,7 @@ func (pc *partitionConsumer) reconnectToBroker() { pc.log.Info("Reconnected consumer to broker") return } + pc.log.WithError(err).Error("Failed to create consumer at reconnect") errMsg := err.Error() if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 4787ba1..db67c25 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -75,15 +75,15 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U p.Lock() conn, ok := p.connections[key] if ok { - p.log.Infof("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v", + p.log.Debugf("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) // remove stale/failed connection if conn.closed() { - delete(p.connections, key) - conn.Close() p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) + delete(p.connections, key) + conn.Close() conn = nil // set to nil so we create a new one } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 3f1e54b..913c33c 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -142,7 +142,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions } err := p.grabCnx() if err != nil { - logger.WithError(err).Error("Failed to create producer") + logger.WithError(err).Error("Failed to create producer at newPartitionProducer") return nil, err } @@ -209,7 +209,7 @@ func (p *partitionProducer) grabCnx() error { } res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { - p.log.WithError(err).Error("Failed to create producer") + p.log.WithError(err).Error("Failed to create producer at send PRODUCER request") return err } @@ -324,6 +324,7 @@ func (p *partitionProducer) reconnectToBroker() { for maxRetry != 0 { if p.getProducerState() != producerReady { // Producer is already closing + p.log.Info("producer state not ready, exit reconnect") return } @@ -337,6 +338,7 @@ func (p *partitionProducer) reconnectToBroker() { p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker") return } + p.log.WithError(err).Error("Failed to create producer at reconnect") errMsg := err.Error() if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. @@ -352,12 +354,15 @@ func (p *partitionProducer) reconnectToBroker() { func (p *partitionProducer) runEventsLoop() { go func() { - select { - case <-p.closeCh: - return - case <-p.connectClosedCh: - p.log.Info("runEventsLoop will reconnect in producer") - p.reconnectToBroker() + for { + select { + case <-p.closeCh: + p.log.Info("close producer, exit reconnect") + return + case <-p.connectClosedCh: + p.log.Info("runEventsLoop will reconnect in producer") + p.reconnectToBroker() + } } }()