This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch revert-691-xiaolong/split-eventsCh in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit accf726bdf3972ac3b8b64c7fe1c0480959cc2de Author: xiaolong ran <wol...@163.com> AuthorDate: Thu Jan 6 10:37:43 2022 +0800 Revert "Use a separate gorutine to handle the logic of reconnect (#691)" This reverts commit 39e13aced4cc35a63f5d6164e63c7f1638388c37. --- pulsar/producer_partition.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b517309..d67c0c0 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -77,8 +77,8 @@ type partitionProducer struct { batchFlushTicker *time.Ticker // Channel where app is posting messages to be published - connectClosedCh chan connectionClosed eventsChan chan interface{} + connectClosedCh chan connectionClosed publishSemaphore internal.Semaphore pendingQueue internal.BlockingQueue @@ -115,8 +115,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions log: logger, options: options, producerID: client.rpcClient.NewProducerID(), + eventsChan: make(chan interface{}, maxPendingMessages), connectClosedCh: make(chan connectionClosed, 10), - eventsChan: make(chan interface{}, maxPendingMessages+20), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), pendingQueue: internal.NewBlockingQueue(maxPendingMessages), @@ -369,16 +369,6 @@ func (p *partitionProducer) reconnectToBroker() { } func (p *partitionProducer) runEventsLoop() { - - go func() { - for { - for range p.connectClosedCh { - p.log.Info("runEventsLoop will reconnect in producer") - p.reconnectToBroker() - } - } - }() - for { select { case i := <-p.eventsChan: @@ -391,6 +381,8 @@ func (p *partitionProducer) runEventsLoop() { p.internalClose(v) return } + case <-p.connectClosedCh: + p.reconnectToBroker() case <-p.batchFlushTicker.C: if p.batchBuilder.IsMultiBatches() { p.internalFlushCurrentBatches()