This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new e3f625a Fix producer unable register when cnx closed (#761) e3f625a is described below commit e3f625ae8da938f5d147bdddd3a2cadced69c07b Author: xiaolong ran <xiaolong...@tencent.com> AuthorDate: Wed Apr 20 21:33:07 2022 +0800 Fix producer unable register when cnx closed (#761) * Fix producer unable register when cnx closed Signed-off-by: xiaolongran <xiaolong...@tencent.com> * fix code style Signed-off-by: xiaolongran <xiaolong...@tencent.com> --- pulsar/internal/connection.go | 10 ++++++---- pulsar/producer_partition.go | 7 +++++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 6055252..11c9d49 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -53,7 +53,8 @@ type TLSOptions struct { } var ( - errConnectionClosed = errors.New("connection closed") + errConnectionClosed = errors.New("connection closed") + errUnableRegisterListener = errors.New("unable register listener when con closed") ) // ConnectionListener is a user of a connection (eg. a producer or @@ -72,7 +73,7 @@ type Connection interface { SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) SendRequestNoWait(req *pb.BaseCommand) error WriteData(data Buffer) - RegisterListener(id uint64, listener ConnectionListener) + RegisterListener(id uint64, listener ConnectionListener) error UnregisterListener(id uint64) AddConsumeHandler(id uint64, handler ConsumerHandler) DeleteConsumeHandler(id uint64) @@ -847,17 +848,18 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer) } } -func (c *connection) RegisterListener(id uint64, listener ConnectionListener) { +func (c *connection) RegisterListener(id uint64, listener ConnectionListener) error { // do not add if connection is closed if c.closed() { c.log.Warnf("Connection closed unable register listener id=%+v", id) - return + return errUnableRegisterListener } c.listenersLock.Lock() defer c.listenersLock.Unlock() c.listeners[id] = listener + return nil } func (c *connection) UnregisterListener(id uint64) { diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index d37d60e..bc775e9 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -262,11 +262,14 @@ func (p *partitionProducer) grabCnx() error { p.sequenceIDGenerator = &nextSequenceID } p._setConn(res.Cnx) - p._getConn().RegisterListener(p.producerID, p) + err = p._getConn().RegisterListener(p.producerID, p) + if err != nil { + return err + } p.log.WithFields(log.Fields{ "cnx": res.Cnx.ID(), "epoch": atomic.LoadUint64(&p.epoch), - }).Debug("Connected producer") + }).Info("Connected producer") pendingItems := p.pendingQueue.ReadableSlice() viewSize := len(pendingItems)