This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new edfb785 feat: add BackoffPolicy to `reader` and improve test case (#889) edfb785 is described below commit edfb785961ca1f27aabea790f5642e1eaca7bd44 Author: labuladong <labulad...@foxmail.com> AuthorDate: Fri Nov 11 11:04:50 2022 +0800 feat: add BackoffPolicy to `reader` and improve test case (#889) * feat: add BackoffPolicy to reader and improve test case. * improve comments * fix code style --- pulsar/consumer_test.go | 41 +++++++++++++++++++++++++++ pulsar/producer_test.go | 40 ++++++++++++++++++++++++++ pulsar/reader.go | 6 ++++ pulsar/reader_impl.go | 1 + pulsar/reader_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 163 insertions(+) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index f574378..e9b3013 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -3262,3 +3262,44 @@ func TestAvailablePermitsLeak(t *testing.T) { assert.NotEqual(t, true, errors.Is(err, context.DeadlineExceeded), "This means the resource is exhausted. consumer.Receive() will block forever.") } + +func TestConsumerWithBackoffPolicy(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + + backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + _consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "sub-1", + Type: Shared, + BackoffPolicy: backoff, + }) + assert.Nil(t, err) + defer _consumer.Close() + + partitionConsumerImp := _consumer.(*consumer).consumers[0] + // 1 s + startTime := time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 2 s + startTime = time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 4 s + startTime = time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 4 s + startTime = time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) +} diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index f193ffd..2338074 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1072,6 +1072,46 @@ func TestSendTimeout(t *testing.T) { makeHTTPCall(t, http.MethodDelete, quotaURL, "") } +func TestProducerWithBackoffPolicy(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + + backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + _producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + SendTimeout: 2 * time.Second, + BackoffPolicy: backoff, + }) + assert.Nil(t, err) + defer _producer.Close() + + partitionProducerImp := _producer.(*producer).producers[0].(*partitionProducer) + // 1 s + startTime := time.Now() + partitionProducerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 2 s + startTime = time.Now() + partitionProducerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 4 s + startTime = time.Now() + partitionProducerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 4 s + startTime = time.Now() + partitionProducerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) +} + func TestSendContextExpired(t *testing.T) { quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota" quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}` diff --git a/pulsar/reader.go b/pulsar/reader.go index 3539037..e4679ab 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -20,6 +20,8 @@ package pulsar import ( "context" "time" + + "github.com/apache/pulsar-client-go/pulsar/internal" ) // ReaderMessage packages Reader and Message as a struct to use. @@ -86,6 +88,10 @@ type ReaderOptions struct { // Schema represents the schema implementation. Schema Schema + + // BackoffPolicy parameterize the following options in the reconnection logic to + // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) + BackoffPolicy internal.BackoffPolicy } // Reader can be used to scan through all the messages currently available in a topic. diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index dd552b6..3931f36 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -106,6 +106,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { replicateSubscriptionState: false, decryption: options.Decryption, schema: options.Schema, + backoffPolicy: options.BackoffPolicy, } reader := &reader{ diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 3c85871..07e7ed3 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -775,3 +775,78 @@ func TestReaderWithSchema(t *testing.T) { assert.Nil(t, err) assert.Equal(t, *res, value) } + +func newTestBackoffPolicy(minBackoff, maxBackoff time.Duration) *testBackoffPolicy { + return &testBackoffPolicy{ + curBackoff: 0, + minBackoff: minBackoff, + maxBackoff: maxBackoff, + } +} + +type testBackoffPolicy struct { + curBackoff, minBackoff, maxBackoff time.Duration + retryTime int +} + +func (b *testBackoffPolicy) Next() time.Duration { + // Double the delay each time + b.curBackoff += b.curBackoff + if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() { + b.curBackoff = b.minBackoff + } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() { + b.curBackoff = b.maxBackoff + } + b.retryTime++ + + return b.curBackoff +} + +func (b *testBackoffPolicy) IsExpectedIntervalFrom(startTime time.Time) bool { + // Approximately equal to expected interval + if time.Since(startTime) < b.curBackoff-time.Second { + return false + } + if time.Since(startTime) > b.curBackoff+time.Second { + return false + } + return true +} + +func TestReaderWithBackoffPolicy(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer client.Close() + + backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + _reader, err := client.CreateReader(ReaderOptions{ + Topic: "my-topic", + StartMessageID: LatestMessageID(), + BackoffPolicy: backoff, + }) + assert.NotNil(t, _reader) + assert.Nil(t, err) + + partitionConsumerImp := _reader.(*reader).pc + // 1 s + startTime := time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 2 s + startTime = time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 4 s + startTime = time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 4 s + startTime = time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) +}