This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new b58f115 [PIP 90] go client retrieve broker metadata (#745) b58f115 is described below commit b58f1157e4aadfc662c3d5fc19f2de55cc9ff411 Author: ZhangJian He <shoot...@gmail.com> AuthorDate: Tue Mar 22 12:15:14 2022 +0800 [PIP 90] go client retrieve broker metadata (#745) [PIP 90] go client retrieve broker metadata --- pulsar/consumer_partition.go | 24 ++++++++++++++++++++-- pulsar/impl_message.go | 10 +++++++++ pulsar/internal/buffer.go | 6 ++++++ pulsar/internal/commands.go | 20 ++++++++++++++++-- pulsar/internal/commands_test.go | 18 ++++++++++++++++ pulsar/internal/connection.go | 5 +++-- .../pulsartracing/message_carrier_util_test.go | 8 ++++++++ pulsar/message.go | 8 ++++++++ pulsar/negative_acks_tracker_test.go | 16 +++++++++++++++ 9 files changed, 109 insertions(+), 6 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 04a39c5..b06474d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -498,12 +498,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pbMsgID := response.GetMessageId() reader := internal.NewMessageReader(headersAndPayload) + brokerMetadata, err := reader.ReadBrokerMetadata() + if err != nil { + // todo optimize use more appropriate error codes + pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError) + return err + } msgMeta, err := reader.ReadMessageMetadata() if err != nil { pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_ChecksumMismatch) return err } - decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta) // error decrypting the payload if err != nil { @@ -597,7 +602,18 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.AckID(msgID) continue } - + var messageIndex *uint64 + var brokerPublishTime *time.Time + if brokerMetadata != nil { + if brokerMetadata.Index != nil { + aux := brokerMetadata.GetIndex() - uint64(numMsgs) + uint64(i) + 1 + messageIndex = &aux + } + if brokerMetadata.BrokerTimestamp != nil { + aux := timeFromUnixTimestampMillis(*brokerMetadata.BrokerTimestamp) + brokerPublishTime = &aux + } + } // set the consumer so we know how to ack the message id msgID.consumer = pc var msg *message @@ -616,6 +632,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header replicatedFrom: msgMeta.GetReplicatedFrom(), redeliveryCount: response.GetRedeliveryCount(), orderingKey: string(smm.OrderingKey), + index: messageIndex, + brokerPublishTime: brokerPublishTime, } } else { msg = &message{ @@ -631,6 +649,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header replicationClusters: msgMeta.GetReplicateTo(), replicatedFrom: msgMeta.GetReplicatedFrom(), redeliveryCount: response.GetRedeliveryCount(), + index: messageIndex, + brokerPublishTime: brokerPublishTime, } } diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index a9809aa..3216676 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -241,6 +241,8 @@ type message struct { redeliveryCount uint32 schema Schema encryptionContext *EncryptionContext + index *uint64 + brokerPublishTime *time.Time } func (msg *message) Topic() string { @@ -299,6 +301,14 @@ func (msg *message) GetEncryptionContext() *EncryptionContext { return msg.encryptionContext } +func (msg *message) Index() *uint64 { + return msg.index +} + +func (msg *message) BrokerPublishTime() *time.Time { + return msg.brokerPublishTime +} + func newAckTracker(size int) *ackTracker { var batchIDs *big.Int if size <= 64 { diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go index f3b8fe6..b3e23fb 100644 --- a/pulsar/internal/buffer.go +++ b/pulsar/internal/buffer.go @@ -38,6 +38,8 @@ type Buffer interface { Read(size uint32) []byte + Skip(size uint32) + Get(readerIndex uint32, size uint32) []byte ReadableSlice() []byte @@ -122,6 +124,10 @@ func (b *buffer) Read(size uint32) []byte { return res } +func (b *buffer) Skip(size uint32) { + b.readerIdx += size +} + func (b *buffer) Get(readerIdx uint32, size uint32) []byte { return b.data[readerIdx : readerIdx+size] } diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index b91c0b6..7fd1885 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -18,6 +18,7 @@ package internal import ( + "encoding/binary" "errors" "fmt" @@ -34,8 +35,9 @@ const ( // MessageFramePadding is for metadata and other frame headers MessageFramePadding = 10 * 1024 // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. - MaxFrameSize = MaxMessageSize + MessageFramePadding - magicCrc32c uint16 = 0x0e01 + MaxFrameSize = MaxMessageSize + MessageFramePadding + magicCrc32c uint16 = 0x0e01 + magicBrokerEntryMetadata uint16 = 0x0e02 ) // ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. @@ -119,6 +121,20 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) { return &meta, nil } +func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) { + magicNumber := binary.BigEndian.Uint16(r.buffer.Get(r.buffer.ReaderIndex(), 2)) + if magicNumber != magicBrokerEntryMetadata { + return nil, nil + } + r.buffer.Skip(2) + size := r.buffer.ReadUint32() + var brokerEntryMetadata pb.BrokerEntryMetadata + if err := proto.Unmarshal(r.buffer.Read(size), &brokerEntryMetadata); err != nil { + return nil, err + } + return &brokerEntryMetadata, nil +} + func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) { if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 { return nil, nil, ErrEOM diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go index b43335a..c236e10 100644 --- a/pulsar/internal/commands_test.go +++ b/pulsar/internal/commands_test.go @@ -70,6 +70,19 @@ func TestReadMessageMetadata(t *testing.T) { assert.Equal(t, 10, int(meta.GetNumMessagesInBatch())) } +func TestReadBrokerEntryMetadata(t *testing.T) { + // read old style message (not batched) + reader := NewMessageReaderFromArray(brokerEntryMeta) + meta, err := reader.ReadBrokerMetadata() + if err != nil { + t.Fatal(err) + } + var expectedBrokerTimestamp uint64 = 1646983036054 + assert.Equal(t, expectedBrokerTimestamp, *meta.BrokerTimestamp) + var expectedIndex uint64 = 5 + assert.Equal(t, expectedIndex, *meta.Index) +} + func TestReadMessageOldFormat(t *testing.T) { reader := NewMessageReaderFromArray(rawCompatSingleMessage) _, err := reader.ReadMessageMetadata() @@ -210,3 +223,8 @@ var rawBatchMessage10 = []byte{ 0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c, 0x6f, } + +var brokerEntryMeta = []byte{ + 0x0e, 0x02, 0x00, 0x00, 0x00, 0x09, 0x08, 0x96, + 0xf9, 0xda, 0xbe, 0xf7, 0x2f, 0x10, 0x05, +} diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 1e2009c..a025abf 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -43,7 +43,7 @@ const ( PulsarVersion = "0.1" ClientVersionString = "Pulsar Go " + PulsarVersion - PulsarProtocolVersion = int32(pb.ProtocolVersion_v13) + PulsarProtocolVersion = int32(pb.ProtocolVersion_v18) ) type TLSOptions struct { @@ -292,7 +292,8 @@ func (c *connection) doHandshake() bool { AuthMethodName: proto.String(c.auth.Name()), AuthData: authData, FeatureFlags: &pb.FeatureFlags{ - SupportsAuthRefresh: proto.Bool(true), + SupportsAuthRefresh: proto.Bool(true), + SupportsBrokerEntryMetadata: proto.Bool(true), }, } diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go b/pulsar/internal/pulsartracing/message_carrier_util_test.go index 677a7ff..7f25578 100644 --- a/pulsar/internal/pulsartracing/message_carrier_util_test.go +++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go @@ -123,3 +123,11 @@ func (msg *mockConsumerMessage) ProducerName() string { func (msg *mockConsumerMessage) GetEncryptionContext() *pulsar.EncryptionContext { return &pulsar.EncryptionContext{} } + +func (msg *mockConsumerMessage) Index() *uint64 { + return nil +} + +func (msg *mockConsumerMessage) BrokerPublishTime() *time.Time { + return nil +} diff --git a/pulsar/message.go b/pulsar/message.go index 3779caf..b88f158 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -121,6 +121,14 @@ type Message interface { // GetEncryptionContext returns the ecryption context of the message. // It will be used by the application to parse the undecrypted message. GetEncryptionContext() *EncryptionContext + + // Index returns index from broker entry metadata, + // or empty if the feature is not enabled in the broker. + Index() *uint64 + + // BrokerPublishTime returns broker publish time from broker entry metadata, + // or empty if the feature is not enabled in the broker. + BrokerPublishTime() *time.Time } // MessageID identifier for a particular message diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index 51965ea..e47fb09 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -237,6 +237,14 @@ func (msg *mockMessage1) GetEncryptionContext() *EncryptionContext { return &EncryptionContext{} } +func (msg *mockMessage1) Index() *uint64 { + return nil +} + +func (msg *mockMessage1) BrokerPublishTime() *time.Time { + return nil +} + type mockMessage2 struct { properties map[string]string } @@ -300,3 +308,11 @@ func (msg *mockMessage2) ProducerName() string { func (msg *mockMessage2) GetEncryptionContext() *EncryptionContext { return &EncryptionContext{} } + +func (msg *mockMessage2) Index() *uint64 { + return nil +} + +func (msg *mockMessage2) BrokerPublishTime() *time.Time { + return nil +}