This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new d0d5d0a Add properties filed for batch (#683) d0d5d0a is described below commit d0d5d0ae403717505f85b86c100fac6ff4d7dcf6 Author: xiaolong ran <r...@apache.org> AuthorDate: Fri Dec 10 15:12:53 2021 +0800 Add properties filed for batch (#683) Signed-off-by: xiaolongran <xiaolong...@tencent.com> ### Motivation Currently, when we disable batch in Producer, in `handleSend()` of `serverCnx.java`, the `msgMetadata.hasNumMessagesInBatch()` is **true** and `msgMetadata.getNumMessagesInBatch()` is **1**. At this point, if we get the Properties object we set on the producer side on the broker side, the display is empty. Go SDK set Properties: ``` // disable batch producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "topic-1", DisableBatching: true, }) // set properties for every message producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), Properties: map[string]string{ "key-1": "value-1", }, }); ``` Broker get message properties from entry metadata is null: ``` ByteBuf metadataAndPayload = entry.getDataBuffer(); MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1); ``` And `msgMetadata.getPropertiesCount() <= 0`. ### Modifications Add properties filed in Add single message to batchContainer --- pulsar/internal/batch_builder.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index d08af53..7e47304 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -191,6 +191,7 @@ func (bc *batchContainer) Add( bc.msgMetadata.ProducerName = &bc.producerName bc.msgMetadata.ReplicateTo = replicateTo bc.msgMetadata.PartitionKey = metadata.PartitionKey + bc.msgMetadata.Properties = metadata.Properties if deliverAt.UnixNano() > 0 { bc.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt))) @@ -211,6 +212,7 @@ func (bc *batchContainer) reset() { bc.callbacks = []interface{}{} bc.msgMetadata.ReplicateTo = nil bc.msgMetadata.DeliverAtTime = nil + bc.msgMetadata.Properties = nil } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted.