This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bbfb8e  [Issue 513] Correct apparent logic error in batchContainer's 
hasSpace() method (#678)
2bbfb8e is described below

commit 2bbfb8e4a66c63843c935a50971474abfa25cdf7
Author: Ben Schofield <bschofi...@users.noreply.github.com>
AuthorDate: Wed Dec 22 06:22:18 2021 +0000

    [Issue 513] Correct apparent logic error in batchContainer's hasSpace() 
method (#678)
    
    * Correct apparent logic error in batchContainer's hasSpace() method.
    
    * Make the same change to keyBasedBatchContainer's hasSpace() method.
    
    * Fix comment length to pass style checks.
    
    * Allow TestMaxMessageSize() to run
    
    * Add TestMaxBatchSize() to validate that this limit is respected. Based on 
the results of the test, change the < in hasSpace() to be a <=.
    
    * Further correct logic in hasSpace() / IsFull()
    
    * Remember to make the change in both places!
    
    Co-authored-by: ben <b...@cyber.casa>
---
 pulsar/internal/batch_builder.go           |  7 ++---
 pulsar/internal/key_based_batch_builder.go |  7 ++---
 pulsar/producer_test.go                    | 42 ++++++++++++++++++++++++++++--
 3 files changed, 48 insertions(+), 8 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 7e47304..9d18f26 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -150,14 +150,15 @@ func NewBatchBuilder(
        return &bc, nil
 }
 
-// IsFull check if the size in the current batch exceeds the maximum size 
allowed by the batch
+// IsFull checks if the size in the current batch meets or exceeds the maximum 
size allowed by the batch
 func (bc *batchContainer) IsFull() bool {
-       return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > 
uint32(bc.maxBatchSize)
+       return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= 
uint32(bc.maxBatchSize)
 }
 
+// hasSpace should return true if and only if the batch container can 
accommodate another message of length payload.
 func (bc *batchContainer) hasSpace(payload []byte) bool {
        msgSize := uint32(len(payload))
-       return bc.numMessages+1 < bc.maxMessages || 
(bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
+       return bc.numMessages+1 <= bc.maxMessages && 
bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
 
 // Add will add single message to batch.
diff --git a/pulsar/internal/key_based_batch_builder.go 
b/pulsar/internal/key_based_batch_builder.go
index 24d564b..d09138c 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -106,18 +106,19 @@ func NewKeyBasedBatchBuilder(
        return bb, nil
 }
 
-// IsFull check if the size in the current batch exceeds the maximum size 
allowed by the batch
+// IsFull checks if the size in the current batch meets or exceeds the maximum 
size allowed by the batch
 func (bc *keyBasedBatchContainer) IsFull() bool {
-       return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > 
uint32(bc.maxBatchSize)
+       return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= 
uint32(bc.maxBatchSize)
 }
 
 func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
        return true
 }
 
+// hasSpace should return true if and only if the batch container can 
accommodate another message of length payload.
 func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
        msgSize := uint32(len(payload))
-       return bc.numMessages+1 < bc.maxMessages || 
(bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
+       return bc.numMessages+1 <= bc.maxMessages && 
bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
 
 // Add will add single message to key-based batch with message key.
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index f914017..124c828 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -857,19 +857,57 @@ func TestDelayAbsolute(t *testing.T) {
        canc()
 }
 
-func TestMaxMessageSize(t *testing.T) {
+func TestMaxBatchSize(t *testing.T) {
+       // Set to be < serverMaxMessageSize
+       batchMaxMessageSize := 512 * 1024
+
        client, err := NewClient(ClientOptions{
                URL: serviceURL,
        })
        assert.NoError(t, err)
        defer client.Close()
+
        producer, err := client.CreateProducer(ProducerOptions{
-               Topic: newTopicName(),
+               Topic:           newTopicName(),
+               BatchingMaxSize: uint(batchMaxMessageSize),
        })
        assert.NoError(t, err)
        assert.NotNil(t, producer)
        defer producer.Close()
+
+       for bias := -1; bias <= 1; bias++ {
+               payload := make([]byte, batchMaxMessageSize+bias)
+               ID, err := producer.Send(context.Background(), &ProducerMessage{
+                       Payload: payload,
+               })
+               if bias <= 0 {
+                       assert.NoError(t, err)
+                       assert.NotNil(t, ID)
+               } else {
+                       assert.Equal(t, errFailAddToBatch, err)
+               }
+       }
+}
+
+func TestMaxMessageSize(t *testing.T) {
        serverMaxMessageSize := 1024 * 1024
+
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       // Need to set BatchingMaxSize > serverMaxMessageSize to avoid 
errMessageTooLarge
+       // being masked by an earlier errFailAddToBatch
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           newTopicName(),
+               BatchingMaxSize: uint(2 * serverMaxMessageSize),
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
        for bias := -1; bias <= 1; bias++ {
                payload := make([]byte, serverMaxMessageSize+bias)
                ID, err := producer.Send(context.Background(), &ProducerMessage{

Reply via email to