This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b58f115  [PIP 90] go client retrieve broker metadata (#745)
b58f115 is described below

commit b58f1157e4aadfc662c3d5fc19f2de55cc9ff411
Author: ZhangJian He <shoot...@gmail.com>
AuthorDate: Tue Mar 22 12:15:14 2022 +0800

    [PIP 90] go client retrieve broker metadata (#745)
    
    [PIP 90] go client retrieve broker metadata
---
 pulsar/consumer_partition.go                       | 24 ++++++++++++++++++++--
 pulsar/impl_message.go                             | 10 +++++++++
 pulsar/internal/buffer.go                          |  6 ++++++
 pulsar/internal/commands.go                        | 20 ++++++++++++++++--
 pulsar/internal/commands_test.go                   | 18 ++++++++++++++++
 pulsar/internal/connection.go                      |  5 +++--
 .../pulsartracing/message_carrier_util_test.go     |  8 ++++++++
 pulsar/message.go                                  |  8 ++++++++
 pulsar/negative_acks_tracker_test.go               | 16 +++++++++++++++
 9 files changed, 109 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 04a39c5..b06474d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -498,12 +498,17 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
        pbMsgID := response.GetMessageId()
 
        reader := internal.NewMessageReader(headersAndPayload)
+       brokerMetadata, err := reader.ReadBrokerMetadata()
+       if err != nil {
+               // todo optimize use more appropriate error codes
+               pc.discardCorruptedMessage(pbMsgID, 
pb.CommandAck_BatchDeSerializeError)
+               return err
+       }
        msgMeta, err := reader.ReadMessageMetadata()
        if err != nil {
                pc.discardCorruptedMessage(pbMsgID, 
pb.CommandAck_ChecksumMismatch)
                return err
        }
-
        decryptedPayload, err := 
pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
        // error decrypting the payload
        if err != nil {
@@ -597,7 +602,18 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                        pc.AckID(msgID)
                        continue
                }
-
+               var messageIndex *uint64
+               var brokerPublishTime *time.Time
+               if brokerMetadata != nil {
+                       if brokerMetadata.Index != nil {
+                               aux := brokerMetadata.GetIndex() - 
uint64(numMsgs) + uint64(i) + 1
+                               messageIndex = &aux
+                       }
+                       if brokerMetadata.BrokerTimestamp != nil {
+                               aux := 
timeFromUnixTimestampMillis(*brokerMetadata.BrokerTimestamp)
+                               brokerPublishTime = &aux
+                       }
+               }
                // set the consumer so we know how to ack the message id
                msgID.consumer = pc
                var msg *message
@@ -616,6 +632,8 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                                replicatedFrom:      
msgMeta.GetReplicatedFrom(),
                                redeliveryCount:     
response.GetRedeliveryCount(),
                                orderingKey:         string(smm.OrderingKey),
+                               index:               messageIndex,
+                               brokerPublishTime:   brokerPublishTime,
                        }
                } else {
                        msg = &message{
@@ -631,6 +649,8 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                                replicationClusters: msgMeta.GetReplicateTo(),
                                replicatedFrom:      
msgMeta.GetReplicatedFrom(),
                                redeliveryCount:     
response.GetRedeliveryCount(),
+                               index:               messageIndex,
+                               brokerPublishTime:   brokerPublishTime,
                        }
                }
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a9809aa..3216676 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -241,6 +241,8 @@ type message struct {
        redeliveryCount     uint32
        schema              Schema
        encryptionContext   *EncryptionContext
+       index               *uint64
+       brokerPublishTime   *time.Time
 }
 
 func (msg *message) Topic() string {
@@ -299,6 +301,14 @@ func (msg *message) GetEncryptionContext() 
*EncryptionContext {
        return msg.encryptionContext
 }
 
+func (msg *message) Index() *uint64 {
+       return msg.index
+}
+
+func (msg *message) BrokerPublishTime() *time.Time {
+       return msg.brokerPublishTime
+}
+
 func newAckTracker(size int) *ackTracker {
        var batchIDs *big.Int
        if size <= 64 {
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index f3b8fe6..b3e23fb 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -38,6 +38,8 @@ type Buffer interface {
 
        Read(size uint32) []byte
 
+       Skip(size uint32)
+
        Get(readerIndex uint32, size uint32) []byte
 
        ReadableSlice() []byte
@@ -122,6 +124,10 @@ func (b *buffer) Read(size uint32) []byte {
        return res
 }
 
+func (b *buffer) Skip(size uint32) {
+       b.readerIdx += size
+}
+
 func (b *buffer) Get(readerIdx uint32, size uint32) []byte {
        return b.data[readerIdx : readerIdx+size]
 }
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index b91c0b6..7fd1885 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -18,6 +18,7 @@
 package internal
 
 import (
+       "encoding/binary"
        "errors"
        "fmt"
 
@@ -34,8 +35,9 @@ const (
        // MessageFramePadding is for metadata and other frame headers
        MessageFramePadding = 10 * 1024
        // MaxFrameSize limit the maximum size that pulsar allows for messages 
to be sent.
-       MaxFrameSize        = MaxMessageSize + MessageFramePadding
-       magicCrc32c  uint16 = 0x0e01
+       MaxFrameSize                    = MaxMessageSize + MessageFramePadding
+       magicCrc32c              uint16 = 0x0e01
+       magicBrokerEntryMetadata uint16 = 0x0e02
 )
 
 // ErrCorruptedMessage is the error returned by ReadMessageData when it has 
detected corrupted data.
@@ -119,6 +121,20 @@ func (r *MessageReader) ReadMessageMetadata() 
(*pb.MessageMetadata, error) {
        return &meta, nil
 }
 
+func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) {
+       magicNumber := 
binary.BigEndian.Uint16(r.buffer.Get(r.buffer.ReaderIndex(), 2))
+       if magicNumber != magicBrokerEntryMetadata {
+               return nil, nil
+       }
+       r.buffer.Skip(2)
+       size := r.buffer.ReadUint32()
+       var brokerEntryMetadata pb.BrokerEntryMetadata
+       if err := proto.Unmarshal(r.buffer.Read(size), &brokerEntryMetadata); 
err != nil {
+               return nil, err
+       }
+       return &brokerEntryMetadata, nil
+}
+
 func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, 
error) {
        if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
                return nil, nil, ErrEOM
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index b43335a..c236e10 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -70,6 +70,19 @@ func TestReadMessageMetadata(t *testing.T) {
        assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
 }
 
+func TestReadBrokerEntryMetadata(t *testing.T) {
+       // read old style message (not batched)
+       reader := NewMessageReaderFromArray(brokerEntryMeta)
+       meta, err := reader.ReadBrokerMetadata()
+       if err != nil {
+               t.Fatal(err)
+       }
+       var expectedBrokerTimestamp uint64 = 1646983036054
+       assert.Equal(t, expectedBrokerTimestamp, *meta.BrokerTimestamp)
+       var expectedIndex uint64 = 5
+       assert.Equal(t, expectedIndex, *meta.Index)
+}
+
 func TestReadMessageOldFormat(t *testing.T) {
        reader := NewMessageReaderFromArray(rawCompatSingleMessage)
        _, err := reader.ReadMessageMetadata()
@@ -210,3 +223,8 @@ var rawBatchMessage10 = []byte{
        0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
        0x6f,
 }
+
+var brokerEntryMeta = []byte{
+       0x0e, 0x02, 0x00, 0x00, 0x00, 0x09, 0x08, 0x96,
+       0xf9, 0xda, 0xbe, 0xf7, 0x2f, 0x10, 0x05,
+}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1e2009c..a025abf 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -43,7 +43,7 @@ const (
        PulsarVersion       = "0.1"
        ClientVersionString = "Pulsar Go " + PulsarVersion
 
-       PulsarProtocolVersion = int32(pb.ProtocolVersion_v13)
+       PulsarProtocolVersion = int32(pb.ProtocolVersion_v18)
 )
 
 type TLSOptions struct {
@@ -292,7 +292,8 @@ func (c *connection) doHandshake() bool {
                AuthMethodName:  proto.String(c.auth.Name()),
                AuthData:        authData,
                FeatureFlags: &pb.FeatureFlags{
-                       SupportsAuthRefresh: proto.Bool(true),
+                       SupportsAuthRefresh:         proto.Bool(true),
+                       SupportsBrokerEntryMetadata: proto.Bool(true),
                },
        }
 
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go 
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index 677a7ff..7f25578 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -123,3 +123,11 @@ func (msg *mockConsumerMessage) ProducerName() string {
 func (msg *mockConsumerMessage) GetEncryptionContext() 
*pulsar.EncryptionContext {
        return &pulsar.EncryptionContext{}
 }
+
+func (msg *mockConsumerMessage) Index() *uint64 {
+       return nil
+}
+
+func (msg *mockConsumerMessage) BrokerPublishTime() *time.Time {
+       return nil
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index 3779caf..b88f158 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -121,6 +121,14 @@ type Message interface {
        // GetEncryptionContext returns the ecryption context of the message.
        // It will be used by the application to parse the undecrypted message.
        GetEncryptionContext() *EncryptionContext
+
+       // Index returns index from broker entry metadata,
+       // or empty if the feature is not enabled in the broker.
+       Index() *uint64
+
+       // BrokerPublishTime returns broker publish time from broker entry 
metadata,
+       // or empty if the feature is not enabled in the broker.
+       BrokerPublishTime() *time.Time
 }
 
 // MessageID identifier for a particular message
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index 51965ea..e47fb09 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -237,6 +237,14 @@ func (msg *mockMessage1) GetEncryptionContext() 
*EncryptionContext {
        return &EncryptionContext{}
 }
 
+func (msg *mockMessage1) Index() *uint64 {
+       return nil
+}
+
+func (msg *mockMessage1) BrokerPublishTime() *time.Time {
+       return nil
+}
+
 type mockMessage2 struct {
        properties map[string]string
 }
@@ -300,3 +308,11 @@ func (msg *mockMessage2) ProducerName() string {
 func (msg *mockMessage2) GetEncryptionContext() *EncryptionContext {
        return &EncryptionContext{}
 }
+
+func (msg *mockMessage2) Index() *uint64 {
+       return nil
+}
+
+func (msg *mockMessage2) BrokerPublishTime() *time.Time {
+       return nil
+}

Reply via email to