This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e3f625a  Fix producer unable register when cnx closed (#761)
e3f625a is described below

commit e3f625ae8da938f5d147bdddd3a2cadced69c07b
Author: xiaolong ran <xiaolong...@tencent.com>
AuthorDate: Wed Apr 20 21:33:07 2022 +0800

    Fix producer unable register when cnx closed (#761)
    
    * Fix producer unable register when cnx closed
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
    
    * fix code style
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
---
 pulsar/internal/connection.go | 10 ++++++----
 pulsar/producer_partition.go  |  7 +++++--
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 6055252..11c9d49 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -53,7 +53,8 @@ type TLSOptions struct {
 }
 
 var (
-       errConnectionClosed = errors.New("connection closed")
+       errConnectionClosed       = errors.New("connection closed")
+       errUnableRegisterListener = errors.New("unable register listener when 
con closed")
 )
 
 // ConnectionListener is a user of a connection (eg. a producer or
@@ -72,7 +73,7 @@ type Connection interface {
        SendRequest(requestID uint64, req *pb.BaseCommand, callback 
func(*pb.BaseCommand, error))
        SendRequestNoWait(req *pb.BaseCommand) error
        WriteData(data Buffer)
-       RegisterListener(id uint64, listener ConnectionListener)
+       RegisterListener(id uint64, listener ConnectionListener) error
        UnregisterListener(id uint64)
        AddConsumeHandler(id uint64, handler ConsumerHandler)
        DeleteConsumeHandler(id uint64)
@@ -847,17 +848,18 @@ func (c *connection) handleCloseProducer(closeProducer 
*pb.CommandCloseProducer)
        }
 }
 
-func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
+func (c *connection) RegisterListener(id uint64, listener ConnectionListener) 
error {
        // do not add if connection is closed
        if c.closed() {
                c.log.Warnf("Connection closed unable register listener 
id=%+v", id)
-               return
+               return errUnableRegisterListener
        }
 
        c.listenersLock.Lock()
        defer c.listenersLock.Unlock()
 
        c.listeners[id] = listener
+       return nil
 }
 
 func (c *connection) UnregisterListener(id uint64) {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d37d60e..bc775e9 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -262,11 +262,14 @@ func (p *partitionProducer) grabCnx() error {
                p.sequenceIDGenerator = &nextSequenceID
        }
        p._setConn(res.Cnx)
-       p._getConn().RegisterListener(p.producerID, p)
+       err = p._getConn().RegisterListener(p.producerID, p)
+       if err != nil {
+               return err
+       }
        p.log.WithFields(log.Fields{
                "cnx":   res.Cnx.ID(),
                "epoch": atomic.LoadUint64(&p.epoch),
-       }).Debug("Connected producer")
+       }).Info("Connected producer")
 
        pendingItems := p.pendingQueue.ReadableSlice()
        viewSize := len(pendingItems)

Reply via email to