This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d336ff7  Remove mutitopic- and regexp consumer along with reader from 
client' handlers map when Close called. (#620)
d336ff7 is described below

commit d336ff717c98b30d976dd1ebf9b1acf76e05c695
Author: PowerStateFailure <29687050+powerstatefail...@users.noreply.github.com>
AuthorDate: Sat Oct 9 12:30:57 2021 +0500

    Remove mutitopic- and regexp consumer along with reader from client' 
handlers map when Close called. (#620)
    
    This change fixes memory leak when frequently using short-living regexp- or 
multitopic consumers because there were not removed from client handler on Close
    
    Co-authored-by: xiaolongran <xiaolong...@tencent.com>
---
 pulsar/consumer_multitopic.go | 4 ++++
 pulsar/consumer_regex.go      | 1 +
 pulsar/reader_impl.go         | 3 +++
 3 files changed, 8 insertions(+)

diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index dc4ad7b..faf8917 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -30,6 +30,8 @@ import (
 )
 
 type multiTopicConsumer struct {
+       client *client
+
        options ConsumerOptions
 
        consumerName string
@@ -48,6 +50,7 @@ type multiTopicConsumer struct {
 func newMultiTopicConsumer(client *client, options ConsumerOptions, topics 
[]string,
        messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) 
(Consumer, error) {
        mtc := &multiTopicConsumer{
+               client:       client,
                options:      options,
                messageCh:    messageCh,
                consumers:    make(map[string]Consumer, len(topics)),
@@ -186,6 +189,7 @@ func (c *multiTopicConsumer) Close() {
                }
                wg.Wait()
                close(c.closeCh)
+               c.client.handlers.Del(c)
                c.dlq.close()
                c.rlq.close()
        })
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 9e0c125..2f46c48 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -217,6 +217,7 @@ func (c *regexConsumer) Close() {
                        }(con)
                }
                wg.Wait()
+               c.client.handlers.Del(c)
                c.dlq.close()
                c.rlq.close()
        })
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index a019d9c..9983286 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -33,6 +33,7 @@ const (
 
 type reader struct {
        sync.Mutex
+       client              *client
        pc                  *partitionConsumer
        messageCh           chan ConsumerMessage
        lastMessageInBroker trackingMessageID
@@ -91,6 +92,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
        }
 
        reader := &reader{
+               client:    client,
                messageCh: make(chan ConsumerMessage),
                log:       client.log.SubLogger(log.Fields{"topic": 
options.Topic}),
                metrics:   client.metrics.GetTopicMetrics(options.Topic),
@@ -174,6 +176,7 @@ func (r *reader) hasMoreMessages() bool {
 
 func (r *reader) Close() {
        r.pc.Close()
+       r.client.handlers.Del(r)
        r.metrics.ReadersClosed.Inc()
 }
 

Reply via email to