This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 925da1a allow config reader subscription name (#754) 925da1a is described below commit 925da1a039a9551ab410727d7766c7e0a13ad69d Author: ZhangJian He <shoot...@gmail.com> AuthorDate: Tue Mar 29 14:22:48 2022 +0800 allow config reader subscription name (#754) ### Motivation allow config reader's subscription name, follow java's feature https://github.com/apache/pulsar/pull/8801 ### Modifications add param `SubscriptionName` in `ReaderOptions` ### Verifying this change add the test for setting the subscritpion name --- pulsar/reader.go | 4 ++++ pulsar/reader_impl.go | 9 ++++++--- pulsar/reader_test.go | 22 ++++++++++++++++++++++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pulsar/reader.go b/pulsar/reader.go index f1cb575..3539037 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -68,6 +68,10 @@ type ReaderOptions struct { // SubscriptionRolePrefix sets the subscription role prefix. The default prefix is "reader". SubscriptionRolePrefix string + // SubscriptionName sets the subscription name. + // If subscriptionRolePrefix is set at the same time, this configuration will prevail + SubscriptionName string + // ReadCompacted, if enabled, the reader will read messages from the compacted topic rather than reading the // full message backlog of the topic. This means that, if the topic has been compacted, the reader will only // see the latest value for each key in the topic, up until the point in the topic message backlog that has diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 596884a..dd552b6 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -66,11 +66,14 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } } - subscriptionName := options.SubscriptionRolePrefix + subscriptionName := options.SubscriptionName if subscriptionName == "" { - subscriptionName = "reader" + subscriptionName = options.SubscriptionRolePrefix + if subscriptionName == "" { + subscriptionName = "reader" + } + subscriptionName += "-" + generateRandomName() } - subscriptionName += "-" + generateRandomName() receiverQueueSize := options.ReceiverQueueSize if receiverQueueSize <= 0 { diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index aa12078..3c85871 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/crypto" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) @@ -48,6 +49,27 @@ func TestReaderConfigErrors(t *testing.T) { assert.NotNil(t, err) } +func TestReaderConfigSubscribeName(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + consumer, err := client.CreateReader(ReaderOptions{ + StartMessageID: EarliestMessageID(), + Topic: uuid.New().String(), + SubscriptionName: uuid.New().String(), + }) + if err != nil { + t.Fatal(err) + } + defer consumer.Close() + assert.NotNil(t, consumer) +} + func TestReader(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL,