This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 2bbfb8e [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678) 2bbfb8e is described below commit 2bbfb8e4a66c63843c935a50971474abfa25cdf7 Author: Ben Schofield <bschofi...@users.noreply.github.com> AuthorDate: Wed Dec 22 06:22:18 2021 +0000 [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678) * Correct apparent logic error in batchContainer's hasSpace() method. * Make the same change to keyBasedBatchContainer's hasSpace() method. * Fix comment length to pass style checks. * Allow TestMaxMessageSize() to run * Add TestMaxBatchSize() to validate that this limit is respected. Based on the results of the test, change the < in hasSpace() to be a <=. * Further correct logic in hasSpace() / IsFull() * Remember to make the change in both places! Co-authored-by: ben <b...@cyber.casa> --- pulsar/internal/batch_builder.go | 7 ++--- pulsar/internal/key_based_batch_builder.go | 7 ++--- pulsar/producer_test.go | 42 ++++++++++++++++++++++++++++-- 3 files changed, 48 insertions(+), 8 deletions(-) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 7e47304..9d18f26 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -150,14 +150,15 @@ func NewBatchBuilder( return &bc, nil } -// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch +// IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch func (bc *batchContainer) IsFull() bool { - return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize) + return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize) } +// hasSpace should return true if and only if the batch container can accommodate another message of length payload. func (bc *batchContainer) hasSpace(payload []byte) bool { msgSize := uint32(len(payload)) - return bc.numMessages+1 < bc.maxMessages || (bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize) + return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize) } // Add will add single message to batch. diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go index 24d564b..d09138c 100644 --- a/pulsar/internal/key_based_batch_builder.go +++ b/pulsar/internal/key_based_batch_builder.go @@ -106,18 +106,19 @@ func NewKeyBasedBatchBuilder( return bb, nil } -// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch +// IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch func (bc *keyBasedBatchContainer) IsFull() bool { - return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize) + return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize) } func (bc *keyBasedBatchContainer) IsMultiBatches() bool { return true } +// hasSpace should return true if and only if the batch container can accommodate another message of length payload. func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool { msgSize := uint32(len(payload)) - return bc.numMessages+1 < bc.maxMessages || (bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize) + return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize) } // Add will add single message to key-based batch with message key. diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index f914017..124c828 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -857,19 +857,57 @@ func TestDelayAbsolute(t *testing.T) { canc() } -func TestMaxMessageSize(t *testing.T) { +func TestMaxBatchSize(t *testing.T) { + // Set to be < serverMaxMessageSize + batchMaxMessageSize := 512 * 1024 + client, err := NewClient(ClientOptions{ URL: serviceURL, }) assert.NoError(t, err) defer client.Close() + producer, err := client.CreateProducer(ProducerOptions{ - Topic: newTopicName(), + Topic: newTopicName(), + BatchingMaxSize: uint(batchMaxMessageSize), }) assert.NoError(t, err) assert.NotNil(t, producer) defer producer.Close() + + for bias := -1; bias <= 1; bias++ { + payload := make([]byte, batchMaxMessageSize+bias) + ID, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: payload, + }) + if bias <= 0 { + assert.NoError(t, err) + assert.NotNil(t, ID) + } else { + assert.Equal(t, errFailAddToBatch, err) + } + } +} + +func TestMaxMessageSize(t *testing.T) { serverMaxMessageSize := 1024 * 1024 + + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + // Need to set BatchingMaxSize > serverMaxMessageSize to avoid errMessageTooLarge + // being masked by an earlier errFailAddToBatch + producer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + BatchingMaxSize: uint(2 * serverMaxMessageSize), + }) + assert.NoError(t, err) + assert.NotNil(t, producer) + defer producer.Close() + for bias := -1; bias <= 1; bias++ { payload := make([]byte, serverMaxMessageSize+bias) ID, err := producer.Send(context.Background(), &ProducerMessage{