This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new edfb785  feat: add BackoffPolicy to `reader` and improve test case 
(#889)
edfb785 is described below

commit edfb785961ca1f27aabea790f5642e1eaca7bd44
Author: labuladong <labulad...@foxmail.com>
AuthorDate: Fri Nov 11 11:04:50 2022 +0800

    feat: add BackoffPolicy to `reader` and improve test case (#889)
    
    * feat: add BackoffPolicy to reader and improve test case.
    
    * improve comments
    
    * fix code style
---
 pulsar/consumer_test.go | 41 +++++++++++++++++++++++++++
 pulsar/producer_test.go | 40 ++++++++++++++++++++++++++
 pulsar/reader.go        |  6 ++++
 pulsar/reader_impl.go   |  1 +
 pulsar/reader_test.go   | 75 +++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 163 insertions(+)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f574378..e9b3013 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -3262,3 +3262,44 @@ func TestAvailablePermitsLeak(t *testing.T) {
        assert.NotEqual(t, true, errors.Is(err, context.DeadlineExceeded),
                "This means the resource is exhausted. consumer.Receive() will 
block forever.")
 }
+
+func TestConsumerWithBackoffPolicy(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+
+       backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+       _consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+               Type:             Shared,
+               BackoffPolicy:    backoff,
+       })
+       assert.Nil(t, err)
+       defer _consumer.Close()
+
+       partitionConsumerImp := _consumer.(*consumer).consumers[0]
+       // 1 s
+       startTime := time.Now()
+       partitionConsumerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+       // 2 s
+       startTime = time.Now()
+       partitionConsumerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+       // 4 s
+       startTime = time.Now()
+       partitionConsumerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+       // 4 s
+       startTime = time.Now()
+       partitionConsumerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index f193ffd..2338074 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1072,6 +1072,46 @@ func TestSendTimeout(t *testing.T) {
        makeHTTPCall(t, http.MethodDelete, quotaURL, "")
 }
 
+func TestProducerWithBackoffPolicy(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+
+       backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+       _producer, err := client.CreateProducer(ProducerOptions{
+               Topic:         topicName,
+               SendTimeout:   2 * time.Second,
+               BackoffPolicy: backoff,
+       })
+       assert.Nil(t, err)
+       defer _producer.Close()
+
+       partitionProducerImp := 
_producer.(*producer).producers[0].(*partitionProducer)
+       // 1 s
+       startTime := time.Now()
+       partitionProducerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+       // 2 s
+       startTime = time.Now()
+       partitionProducerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+       // 4 s
+       startTime = time.Now()
+       partitionProducerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+       // 4 s
+       startTime = time.Now()
+       partitionProducerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+}
+
 func TestSendContextExpired(t *testing.T) {
        quotaURL := adminURL + 
"/admin/v2/namespaces/public/default/backlogQuota"
        quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 3539037..e4679ab 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -20,6 +20,8 @@ package pulsar
 import (
        "context"
        "time"
+
+       "github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
 // ReaderMessage packages Reader and Message as a struct to use.
@@ -86,6 +88,10 @@ type ReaderOptions struct {
 
        // Schema represents the schema implementation.
        Schema Schema
+
+       // BackoffPolicy parameterize the following options in the reconnection 
logic to
+       // allow users to customize the reconnection logic (minBackoff, 
maxBackoff and jitterPercentage)
+       BackoffPolicy internal.BackoffPolicy
 }
 
 // Reader can be used to scan through all the messages currently available in 
a topic.
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index dd552b6..3931f36 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -106,6 +106,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                replicateSubscriptionState: false,
                decryption:                 options.Decryption,
                schema:                     options.Schema,
+               backoffPolicy:              options.BackoffPolicy,
        }
 
        reader := &reader{
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 3c85871..07e7ed3 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -775,3 +775,78 @@ func TestReaderWithSchema(t *testing.T) {
        assert.Nil(t, err)
        assert.Equal(t, *res, value)
 }
+
+func newTestBackoffPolicy(minBackoff, maxBackoff time.Duration) 
*testBackoffPolicy {
+       return &testBackoffPolicy{
+               curBackoff: 0,
+               minBackoff: minBackoff,
+               maxBackoff: maxBackoff,
+       }
+}
+
+type testBackoffPolicy struct {
+       curBackoff, minBackoff, maxBackoff time.Duration
+       retryTime                          int
+}
+
+func (b *testBackoffPolicy) Next() time.Duration {
+       // Double the delay each time
+       b.curBackoff += b.curBackoff
+       if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+               b.curBackoff = b.minBackoff
+       } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+               b.curBackoff = b.maxBackoff
+       }
+       b.retryTime++
+
+       return b.curBackoff
+}
+
+func (b *testBackoffPolicy) IsExpectedIntervalFrom(startTime time.Time) bool {
+       // Approximately equal to expected interval
+       if time.Since(startTime) < b.curBackoff-time.Second {
+               return false
+       }
+       if time.Since(startTime) > b.curBackoff+time.Second {
+               return false
+       }
+       return true
+}
+
+func TestReaderWithBackoffPolicy(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+       _reader, err := client.CreateReader(ReaderOptions{
+               Topic:          "my-topic",
+               StartMessageID: LatestMessageID(),
+               BackoffPolicy:  backoff,
+       })
+       assert.NotNil(t, _reader)
+       assert.Nil(t, err)
+
+       partitionConsumerImp := _reader.(*reader).pc
+       // 1 s
+       startTime := time.Now()
+       partitionConsumerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+       // 2 s
+       startTime = time.Now()
+       partitionConsumerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+       // 4 s
+       startTime = time.Now()
+       partitionConsumerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+       // 4 s
+       startTime = time.Now()
+       partitionConsumerImp.reconnectToBroker()
+       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+}

Reply via email to