This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new d336ff7 Remove mutitopic- and regexp consumer along with reader from client' handlers map when Close called. (#620) d336ff7 is described below commit d336ff717c98b30d976dd1ebf9b1acf76e05c695 Author: PowerStateFailure <29687050+powerstatefail...@users.noreply.github.com> AuthorDate: Sat Oct 9 12:30:57 2021 +0500 Remove mutitopic- and regexp consumer along with reader from client' handlers map when Close called. (#620) This change fixes memory leak when frequently using short-living regexp- or multitopic consumers because there were not removed from client handler on Close Co-authored-by: xiaolongran <xiaolong...@tencent.com> --- pulsar/consumer_multitopic.go | 4 ++++ pulsar/consumer_regex.go | 1 + pulsar/reader_impl.go | 3 +++ 3 files changed, 8 insertions(+) diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index dc4ad7b..faf8917 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -30,6 +30,8 @@ import ( ) type multiTopicConsumer struct { + client *client + options ConsumerOptions consumerName string @@ -48,6 +50,7 @@ type multiTopicConsumer struct { func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string, messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { mtc := &multiTopicConsumer{ + client: client, options: options, messageCh: messageCh, consumers: make(map[string]Consumer, len(topics)), @@ -186,6 +189,7 @@ func (c *multiTopicConsumer) Close() { } wg.Wait() close(c.closeCh) + c.client.handlers.Del(c) c.dlq.close() c.rlq.close() }) diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 9e0c125..2f46c48 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -217,6 +217,7 @@ func (c *regexConsumer) Close() { }(con) } wg.Wait() + c.client.handlers.Del(c) c.dlq.close() c.rlq.close() }) diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index a019d9c..9983286 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -33,6 +33,7 @@ const ( type reader struct { sync.Mutex + client *client pc *partitionConsumer messageCh chan ConsumerMessage lastMessageInBroker trackingMessageID @@ -91,6 +92,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } reader := &reader{ + client: client, messageCh: make(chan ConsumerMessage), log: client.log.SubLogger(log.Fields{"topic": options.Topic}), metrics: client.metrics.GetTopicMetrics(options.Topic), @@ -174,6 +176,7 @@ func (r *reader) hasMoreMessages() bool { func (r *reader) Close() { r.pc.Close() + r.client.handlers.Del(r) r.metrics.ReadersClosed.Inc() }