This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new ff7a962 Revert "Use a separate gorutine to handle the logic of reconnect" (#700) ff7a962 is described below commit ff7a962be6b41da2c318e6fe70da00545bc0bdd4 Author: xiaolong ran <r...@apache.org> AuthorDate: Thu Jan 6 18:43:05 2022 +0800 Revert "Use a separate gorutine to handle the logic of reconnect" (#700) * Revert "Use a separate gorutine to handle the logic of reconnect (#691)" This reverts commit 39e13aced4cc35a63f5d6164e63c7f1638388c37. * add closeCh for go rutine leak Signed-off-by: xiaolongran <xiaolong...@tencent.com> --- pulsar/producer_partition.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b517309..3d62758 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -77,8 +77,9 @@ type partitionProducer struct { batchFlushTicker *time.Ticker // Channel where app is posting messages to be published - connectClosedCh chan connectionClosed eventsChan chan interface{} + closeCh chan struct{} + connectClosedCh chan connectionClosed publishSemaphore internal.Semaphore pendingQueue internal.BlockingQueue @@ -115,8 +116,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions log: logger, options: options, producerID: client.rpcClient.NewProducerID(), + eventsChan: make(chan interface{}, maxPendingMessages), connectClosedCh: make(chan connectionClosed, 10), - eventsChan: make(chan interface{}, maxPendingMessages+20), + closeCh: make(chan struct{}), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), pendingQueue: internal.NewBlockingQueue(maxPendingMessages), @@ -369,13 +371,13 @@ func (p *partitionProducer) reconnectToBroker() { } func (p *partitionProducer) runEventsLoop() { - go func() { - for { - for range p.connectClosedCh { - p.log.Info("runEventsLoop will reconnect in producer") - p.reconnectToBroker() - } + select { + case <-p.closeCh: + return + case <-p.connectClosedCh: + p.log.Info("runEventsLoop will reconnect in producer") + p.reconnectToBroker() } }() @@ -872,6 +874,8 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.setProducerState(producerClosed) p.cnx.UnregisterListener(p.producerID) p.batchFlushTicker.Stop() + + close(p.closeCh) } func (p *partitionProducer) LastSequenceID() int64 {