This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d0d5d0a  Add properties filed for batch (#683)
d0d5d0a is described below

commit d0d5d0ae403717505f85b86c100fac6ff4d7dcf6
Author: xiaolong ran <r...@apache.org>
AuthorDate: Fri Dec 10 15:12:53 2021 +0800

    Add properties filed for batch (#683)
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
    
    ### Motivation
    
    Currently, when we disable batch in Producer, in `handleSend()` of 
`serverCnx.java`, the `msgMetadata.hasNumMessagesInBatch()` is **true** and 
`msgMetadata.getNumMessagesInBatch()` is **1**.
    
    At this point, if we get the Properties object we set on the producer side 
on the broker side, the display is empty.
    
    Go SDK set Properties:
    
    ```
    
    // disable batch
    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "topic-1",
        DisableBatching: true,
    })
    
    // set properties for every message
    producer.Send(ctx, &pulsar.ProducerMessage{
        Payload: []byte(fmt.Sprintf("hello-%d", i)),
        Properties: map[string]string{
                "key-1": "value-1",
        },
    });
    ```
    
    Broker get message properties from entry metadata is null:
    
    ```
    ByteBuf metadataAndPayload = entry.getDataBuffer();
    
    MessageMetadata msgMetadata = 
Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
    ```
    
    And `msgMetadata.getPropertiesCount() <= 0`.
    
    
    ### Modifications
    
    Add properties filed in Add single message to batchContainer
---
 pulsar/internal/batch_builder.go | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index d08af53..7e47304 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -191,6 +191,7 @@ func (bc *batchContainer) Add(
                bc.msgMetadata.ProducerName = &bc.producerName
                bc.msgMetadata.ReplicateTo = replicateTo
                bc.msgMetadata.PartitionKey = metadata.PartitionKey
+               bc.msgMetadata.Properties = metadata.Properties
 
                if deliverAt.UnixNano() > 0 {
                        bc.msgMetadata.DeliverAtTime = 
proto.Int64(int64(TimestampMillis(deliverAt)))
@@ -211,6 +212,7 @@ func (bc *batchContainer) reset() {
        bc.callbacks = []interface{}{}
        bc.msgMetadata.ReplicateTo = nil
        bc.msgMetadata.DeliverAtTime = nil
+       bc.msgMetadata.Properties = nil
 }
 
 // Flush all the messages buffered in the client and wait until all messages 
have been successfully persisted.

Reply via email to