This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a013ff0  [Issue 833] Fix the availablePermits leak that could cause 
consumer stuck. (#835)
a013ff0 is described below

commit a013ff0b7353fab87a7eb7599377bb06b46eb7b7
Author: Jiaqi Shen <18863662...@163.com>
AuthorDate: Thu Oct 13 16:06:01 2022 +0800

    [Issue 833] Fix the availablePermits leak that could cause consumer stuck. 
(#835)
    
    * fix: fix for issue833
    
    * fix: fix for issue833 by modify dispatcher()
---
 pulsar/consumer_partition.go | 55 +++++++++++++++++++++--------
 pulsar/consumer_test.go      | 82 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 122 insertions(+), 15 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 7ddff5e..5b61e7d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -82,6 +82,15 @@ const (
        noMessageEntry = -1
 )
 
+type permitsReq int32
+
+const (
+       // reset the availablePermits of pc
+       permitsReset permitsReq = iota
+       // increase the availablePermits
+       permitsInc
+)
+
 type partitionConsumerOpts struct {
        topic                      string
        consumerName               string
@@ -128,7 +137,8 @@ type partitionConsumer struct {
        messageCh chan ConsumerMessage
 
        // the number of message slots available
-       availablePermits int32
+       availablePermits   int32
+       availablePermitsCh chan permitsReq
 
        // the size of the queue channel for buffering messages
        queueSize       int32
@@ -224,6 +234,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                dlq:                  dlq,
                metrics:              metrics,
                schemaInfoCache:      newSchemaInfoCache(client, options.topic),
+               availablePermitsCh:   make(chan permitsReq, 10),
        }
        pc.setConsumerState(consumerInit)
        pc.log = client.log.SubLogger(log.Fields{
@@ -932,7 +943,7 @@ func (pc *partitionConsumer) dispatcher() {
                        messages = nil
 
                        // reset available permits
-                       pc.availablePermits = 0
+                       pc.availablePermitsCh <- permitsReset
                        initialPermits := uint32(pc.queueSize)
 
                        pc.log.Debugf("dispatcher requesting initial 
permits=%d", initialPermits)
@@ -955,19 +966,14 @@ func (pc *partitionConsumer) dispatcher() {
                        messages[0] = nil
                        messages = messages[1:]
 
-                       // TODO implement a better flow controller
-                       // send more permits if needed
-                       pc.availablePermits++
-                       flowThreshold := 
int32(math.Max(float64(pc.queueSize/2), 1))
-                       if pc.availablePermits >= flowThreshold {
-                               availablePermits := pc.availablePermits
-                               requestedPermits := availablePermits
-                               pc.availablePermits = 0
+                       pc.availablePermitsCh <- permitsInc
 
-                               pc.log.Debugf("requesting more permits=%d 
available=%d", requestedPermits, availablePermits)
-                               if err := 
pc.internalFlow(uint32(requestedPermits)); err != nil {
-                                       pc.log.WithError(err).Error("unable to 
send permits")
-                               }
+               case pr := <-pc.availablePermitsCh:
+                       switch pr {
+                       case permitsInc:
+                               pc.increasePermitsAndRequestMoreIfNeed()
+                       case permitsReset:
+                               pc.availablePermits = 0
                        }
 
                case clearQueueCb := <-pc.clearQueueCh:
@@ -998,7 +1004,7 @@ func (pc *partitionConsumer) dispatcher() {
                        messages = nil
 
                        // reset available permits
-                       pc.availablePermits = 0
+                       pc.availablePermitsCh <- permitsReset
                        initialPermits := uint32(pc.queueSize)
 
                        pc.log.Debugf("dispatcher requesting initial 
permits=%d", initialPermits)
@@ -1438,6 +1444,25 @@ func (pc *partitionConsumer) 
discardCorruptedMessage(msgID *pb.MessageIdData,
        if err != nil {
                pc.log.Error("Connection was closed when request ack cmd")
        }
+       pc.availablePermitsCh <- permitsInc
+}
+
+func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() {
+       // TODO implement a better flow controller
+       // send more permits if needed
+       flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
+       pc.availablePermits++
+       ap := pc.availablePermits
+       if ap >= flowThreshold {
+               availablePermits := ap
+               requestedPermits := ap
+               pc.availablePermitsCh <- permitsReset
+
+               pc.log.Debugf("requesting more permits=%d available=%d", 
requestedPermits, availablePermits)
+               if err := pc.internalFlow(uint32(requestedPermits)); err != nil 
{
+                       pc.log.WithError(err).Error("unable to send permits")
+               }
+       }
 }
 
 // _setConn sets the internal connection field of this partition consumer 
atomically.
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index a180586..f574378 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "context"
+       "errors"
        "fmt"
        "io/ioutil"
        "log"
@@ -3180,3 +3181,84 @@ func TestConsumerSeekByTimeOnPartitionedTopic(t 
*testing.T) {
                consumer.Ack(msg)
        }
 }
+
+func TestAvailablePermitsLeak(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       client.Close()
+
+       topic := fmt.Sprintf("my-topic-test-ap-leak-%v", 
time.Now().Nanosecond())
+
+       // 1. Producer with valid key name
+       p1, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"client-rsa.pem"},
+               },
+               Schema:          NewStringSchema(nil),
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       assert.NotNil(t, p1)
+
+       subscriptionName := "enc-failure-subcription"
+       totalMessages := 1000
+
+       // 2. KeyReader is not set by the consumer
+       // Receive should fail since KeyReader is not setup
+       // because default behaviour of consumer is fail receiving message if 
error in decryption
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: subscriptionName,
+       })
+       assert.Nil(t, err)
+
+       messageFormat := "my-message-%v"
+       for i := 0; i < totalMessages; i++ {
+               _, err := p1.Send(context.Background(), &ProducerMessage{
+                       Value: fmt.Sprintf(messageFormat, i),
+               })
+               assert.Nil(t, err)
+       }
+
+       // 2. Set another producer that send message without crypto.
+       // The consumer can receive it correct.
+       p2, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               Schema:          NewStringSchema(nil),
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       assert.NotNil(t, p2)
+
+       _, err = p2.Send(context.Background(), &ProducerMessage{
+               Value: fmt.Sprintf(messageFormat, totalMessages),
+       })
+       assert.Nil(t, err)
+
+       // 3. Discard action on decryption failure. Create a availablePermits 
leak scenario
+       consumer.Close()
+
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: subscriptionName,
+               Decryption: &MessageDecryptionInfo{
+                       ConsumerCryptoFailureAction: 
crypto.ConsumerCryptoFailureActionDiscard,
+               },
+               Schema: NewStringSchema(nil),
+       })
+       assert.Nil(t, err)
+       assert.NotNil(t, consumer)
+
+       // 4. If availablePermits does not leak, consumer can get the last 
message which is no crypto.
+       // The ctx3 will not exceed deadline.
+       ctx3, cancel3 := context.WithTimeout(context.Background(), 
15*time.Second)
+       _, err = consumer.Receive(ctx3)
+       cancel3()
+       assert.NotEqual(t, true, errors.Is(err, context.DeadlineExceeded),
+               "This means the resource is exhausted. consumer.Receive() will 
block forever.")
+}

Reply via email to