This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ff7a962  Revert "Use a separate gorutine to handle the logic of 
reconnect" (#700)
ff7a962 is described below

commit ff7a962be6b41da2c318e6fe70da00545bc0bdd4
Author: xiaolong ran <r...@apache.org>
AuthorDate: Thu Jan 6 18:43:05 2022 +0800

    Revert "Use a separate gorutine to handle the logic of reconnect" (#700)
    
    * Revert "Use a separate gorutine to handle the logic of reconnect (#691)"
    
    This reverts commit 39e13aced4cc35a63f5d6164e63c7f1638388c37.
    
    * add closeCh for go rutine leak
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
---
 pulsar/producer_partition.go | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b517309..3d62758 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -77,8 +77,9 @@ type partitionProducer struct {
        batchFlushTicker         *time.Ticker
 
        // Channel where app is posting messages to be published
-       connectClosedCh chan connectionClosed
        eventsChan      chan interface{}
+       closeCh         chan struct{}
+       connectClosedCh chan connectionClosed
 
        publishSemaphore internal.Semaphore
        pendingQueue     internal.BlockingQueue
@@ -115,8 +116,9 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                log:              logger,
                options:          options,
                producerID:       client.rpcClient.NewProducerID(),
+               eventsChan:       make(chan interface{}, maxPendingMessages),
                connectClosedCh:  make(chan connectionClosed, 10),
-               eventsChan:       make(chan interface{}, maxPendingMessages+20),
+               closeCh:          make(chan struct{}),
                batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
                publishSemaphore: 
internal.NewSemaphore(int32(maxPendingMessages)),
                pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
@@ -369,13 +371,13 @@ func (p *partitionProducer) reconnectToBroker() {
 }
 
 func (p *partitionProducer) runEventsLoop() {
-
        go func() {
-               for {
-                       for range p.connectClosedCh {
-                               p.log.Info("runEventsLoop will reconnect in 
producer")
-                               p.reconnectToBroker()
-                       }
+               select {
+               case <-p.closeCh:
+                       return
+               case <-p.connectClosedCh:
+                       p.log.Info("runEventsLoop will reconnect in producer")
+                       p.reconnectToBroker()
                }
        }()
 
@@ -872,6 +874,8 @@ func (p *partitionProducer) internalClose(req 
*closeProducer) {
        p.setProducerState(producerClosed)
        p.cnx.UnregisterListener(p.producerID)
        p.batchFlushTicker.Stop()
+
+       close(p.closeCh)
 }
 
 func (p *partitionProducer) LastSequenceID() int64 {

Reply via email to