This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a119bab  [DefaultRouter] fix unnecessary system clock reads due to 
races accessing router state (#694)
a119bab is described below

commit a119bab0f8598601c0eb7f0fcd97da7ab06700c7
Author: dferstay <dfers...@users.noreply.github.com>
AuthorDate: Mon Jan 17 00:05:25 2022 -0800

    [DefaultRouter] fix unnecessary system clock reads due to races accessing 
router state (#694)
    
    Previously, we used atomic operations to read and update parts of the
    default router state.  Unfortunately, the reads and updates could race
    under concurrent calls leading to unnecessary clock reads and an
    associated slowdown in performance.
    
    Now, we use atomic addition to increment the message count and batch size.
    This removes the race condition by ensuring that each go-routine will have
    a unique messageCount, and hence only one will perform the clock read.
    
    Furthermore, we use atomic compare-and-swap to ensure that partitions are
    not skipped if multiple go-routines attempt to increment the partition
    cursor.
    
    Signed-off-by: Daniel Ferstay <dfers...@splunk.com>
    
    Co-authored-by: Daniel Ferstay <dfers...@splunk.com>
---
 pulsar/default_router.go      | 50 +++++++++++++++++++++----------------------
 pulsar/default_router_test.go | 13 +++++++----
 2 files changed, 34 insertions(+), 29 deletions(-)

diff --git a/pulsar/default_router.go b/pulsar/default_router.go
index b5e24a6..6945ff1 100644
--- a/pulsar/default_router.go
+++ b/pulsar/default_router.go
@@ -18,7 +18,6 @@
 package pulsar
 
 import (
-       "math"
        "math/rand"
        "sync/atomic"
        "time"
@@ -27,7 +26,7 @@ import (
 type defaultRouter struct {
        currentPartitionCursor uint32
 
-       lastChangeTimestamp int64
+       lastBatchTimestamp  int64
        msgCounter          uint32
        cumulativeBatchSize uint32
 }
@@ -45,7 +44,7 @@ func NewDefaultRouter(
        disableBatching bool) func(*ProducerMessage, uint32) int {
        state := &defaultRouter{
                currentPartitionCursor: rand.Uint32(),
-               lastChangeTimestamp:    math.MinInt64,
+               lastBatchTimestamp:     time.Now().UnixNano(),
        }
 
        readClockAfterNumMessages := uint32(maxBatchingMessages / 10)
@@ -75,37 +74,38 @@ func NewDefaultRouter(
                // If there's no key, we do round-robin across partition, 
sticking with a given
                // partition for a certain amount of messages or volume 
buffered or the max delay to batch is reached so that
                // we ensure having a decent amount of batching of the messages.
-               // Note that it is possible that we skip more than one 
partition if multiple goroutines increment
-               // currentPartitionCursor at the same time. If that happens it 
shouldn't be a problem because we only want to
-               // spread the data on different partitions but not necessarily 
in a specific sequence.
                var now int64
                size := uint32(len(message.Payload))
-               previousMessageCount := atomic.LoadUint32(&state.msgCounter)
-               previousBatchingMaxSize := 
atomic.LoadUint32(&state.cumulativeBatchSize)
-               previousLastChange := 
atomic.LoadInt64(&state.lastChangeTimestamp)
+               partitionCursor := 
atomic.LoadUint32(&state.currentPartitionCursor)
+               messageCount := atomic.AddUint32(&state.msgCounter, 1)
+               batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size)
 
-               messageCountReached := previousMessageCount >= 
uint32(maxBatchingMessages-1)
-               sizeReached := (size >= 
uint32(maxBatchingSize)-previousBatchingMaxSize)
+               // Note: use greater-than for the threshold check so that we 
don't route this message to a new partition
+               //       before a batch is complete.
+               messageCountReached := messageCount > 
uint32(maxBatchingMessages)
+               sizeReached := batchSize > uint32(maxBatchingSize)
                durationReached := false
-               if readClockAfterNumMessages == 0 || 
previousMessageCount%readClockAfterNumMessages == 0 {
+               if readClockAfterNumMessages == 0 || 
messageCount%readClockAfterNumMessages == 0 {
                        now = time.Now().UnixNano()
-                       durationReached = now-previousLastChange >= 
maxBatchingDelay.Nanoseconds()
+                       lastBatchTime := 
atomic.LoadInt64(&state.lastBatchTimestamp)
+                       durationReached = now-lastBatchTime > 
maxBatchingDelay.Nanoseconds()
                }
                if messageCountReached || sizeReached || durationReached {
-                       atomic.AddUint32(&state.currentPartitionCursor, 1)
-                       atomic.StoreUint32(&state.msgCounter, 0)
-                       atomic.StoreUint32(&state.cumulativeBatchSize, 0)
-                       if now != 0 {
-                               atomic.StoreInt64(&state.lastChangeTimestamp, 
now)
+                       // Note: CAS to ensure that concurrent go-routines can 
only move the cursor forward by one so that
+                       //       partitions are not skipped.
+                       newCursor := partitionCursor + 1
+                       if 
atomic.CompareAndSwapUint32(&state.currentPartitionCursor, partitionCursor, 
newCursor) {
+                               atomic.StoreUint32(&state.msgCounter, 0)
+                               atomic.StoreUint32(&state.cumulativeBatchSize, 
0)
+                               if now == 0 {
+                                       now = time.Now().UnixNano()
+                               }
+                               atomic.StoreInt64(&state.lastBatchTimestamp, 
now)
                        }
-                       return int(state.currentPartitionCursor % numPartitions)
-               }
 
-               atomic.AddUint32(&state.msgCounter, 1)
-               atomic.AddUint32(&state.cumulativeBatchSize, size)
-               if now != 0 {
-                       atomic.StoreInt64(&state.lastChangeTimestamp, now)
+                       return int(newCursor % numPartitions)
                }
-               return int(state.currentPartitionCursor % numPartitions)
+
+               return int(partitionCursor % numPartitions)
        }
 }
diff --git a/pulsar/default_router_test.go b/pulsar/default_router_test.go
index 31b27af..3c42e66 100644
--- a/pulsar/default_router_test.go
+++ b/pulsar/default_router_test.go
@@ -71,16 +71,21 @@ func 
TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) {
        const numPartitions = uint32(3)
        p1 := router(&ProducerMessage{
                Payload: []byte("message 1"),
-       }, 3)
+       }, numPartitions)
        assert.LessOrEqual(t, p1, int(numPartitions))
 
        p2 := router(&ProducerMessage{
                Payload: []byte("message 2"),
        }, numPartitions)
-       if p1 == int(numPartitions-1) {
-               assert.Equal(t, 0, p2)
+       assert.Equal(t, p1, p2)
+
+       p3 := router(&ProducerMessage{
+               Payload: []byte("message 3"),
+       }, numPartitions)
+       if p2 == int(numPartitions-1) {
+               assert.Equal(t, 0, p3)
        } else {
-               assert.Equal(t, p1+1, p2)
+               assert.Equal(t, p2+1, p3)
        }
 }
 

Reply via email to