This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch revert-691-xiaolong/split-eventsCh
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit accf726bdf3972ac3b8b64c7fe1c0480959cc2de
Author: xiaolong ran <wol...@163.com>
AuthorDate: Thu Jan 6 10:37:43 2022 +0800

    Revert "Use a separate gorutine to handle the logic of reconnect (#691)"
    
    This reverts commit 39e13aced4cc35a63f5d6164e63c7f1638388c37.
---
 pulsar/producer_partition.go | 16 ++++------------
 1 file changed, 4 insertions(+), 12 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b517309..d67c0c0 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -77,8 +77,8 @@ type partitionProducer struct {
        batchFlushTicker         *time.Ticker
 
        // Channel where app is posting messages to be published
-       connectClosedCh chan connectionClosed
        eventsChan      chan interface{}
+       connectClosedCh chan connectionClosed
 
        publishSemaphore internal.Semaphore
        pendingQueue     internal.BlockingQueue
@@ -115,8 +115,8 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                log:              logger,
                options:          options,
                producerID:       client.rpcClient.NewProducerID(),
+               eventsChan:       make(chan interface{}, maxPendingMessages),
                connectClosedCh:  make(chan connectionClosed, 10),
-               eventsChan:       make(chan interface{}, maxPendingMessages+20),
                batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
                publishSemaphore: 
internal.NewSemaphore(int32(maxPendingMessages)),
                pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
@@ -369,16 +369,6 @@ func (p *partitionProducer) reconnectToBroker() {
 }
 
 func (p *partitionProducer) runEventsLoop() {
-
-       go func() {
-               for {
-                       for range p.connectClosedCh {
-                               p.log.Info("runEventsLoop will reconnect in 
producer")
-                               p.reconnectToBroker()
-                       }
-               }
-       }()
-
        for {
                select {
                case i := <-p.eventsChan:
@@ -391,6 +381,8 @@ func (p *partitionProducer) runEventsLoop() {
                                p.internalClose(v)
                                return
                        }
+               case <-p.connectClosedCh:
+                       p.reconnectToBroker()
                case <-p.batchFlushTicker.C:
                        if p.batchBuilder.IsMultiBatches() {
                                p.internalFlushCurrentBatches()

Reply via email to