This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 925da1a  allow config reader subscription name (#754)
925da1a is described below

commit 925da1a039a9551ab410727d7766c7e0a13ad69d
Author: ZhangJian He <shoot...@gmail.com>
AuthorDate: Tue Mar 29 14:22:48 2022 +0800

    allow config reader subscription name (#754)
    
    ### Motivation
    allow config reader's subscription name, follow java's feature 
https://github.com/apache/pulsar/pull/8801
    
    ### Modifications
    add param `SubscriptionName` in `ReaderOptions`
    
    ### Verifying this change
    add the test for setting the subscritpion name
---
 pulsar/reader.go      |  4 ++++
 pulsar/reader_impl.go |  9 ++++++---
 pulsar/reader_test.go | 22 ++++++++++++++++++++++
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/pulsar/reader.go b/pulsar/reader.go
index f1cb575..3539037 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -68,6 +68,10 @@ type ReaderOptions struct {
        // SubscriptionRolePrefix sets the subscription role prefix. The 
default prefix is "reader".
        SubscriptionRolePrefix string
 
+       // SubscriptionName sets the subscription name.
+       // If subscriptionRolePrefix is set at the same time, this 
configuration will prevail
+       SubscriptionName string
+
        // ReadCompacted, if enabled, the reader will read messages from the 
compacted topic rather than reading the
        // full message backlog of the topic. This means that, if the topic has 
been compacted, the reader will only
        // see the latest value for each key in the topic, up until the point 
in the topic message backlog that has
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 596884a..dd552b6 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -66,11 +66,14 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                }
        }
 
-       subscriptionName := options.SubscriptionRolePrefix
+       subscriptionName := options.SubscriptionName
        if subscriptionName == "" {
-               subscriptionName = "reader"
+               subscriptionName = options.SubscriptionRolePrefix
+               if subscriptionName == "" {
+                       subscriptionName = "reader"
+               }
+               subscriptionName += "-" + generateRandomName()
        }
-       subscriptionName += "-" + generateRandomName()
 
        receiverQueueSize := options.ReceiverQueueSize
        if receiverQueueSize <= 0 {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index aa12078..3c85871 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -24,6 +24,7 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/crypto"
+       "github.com/google/uuid"
        "github.com/stretchr/testify/assert"
 )
 
@@ -48,6 +49,27 @@ func TestReaderConfigErrors(t *testing.T) {
        assert.NotNil(t, err)
 }
 
+func TestReaderConfigSubscribeName(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer client.Close()
+
+       consumer, err := client.CreateReader(ReaderOptions{
+               StartMessageID:   EarliestMessageID(),
+               Topic:            uuid.New().String(),
+               SubscriptionName: uuid.New().String(),
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer consumer.Close()
+       assert.NotNil(t, consumer)
+}
+
 func TestReader(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,

Reply via email to