This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new c41616b Support ack response for Go SDK (#776) c41616b is described below commit c41616b2f5125603fe252a90ca004bc0bd3d76d8 Author: xiaolong ran <xiaolong...@tencent.com> AuthorDate: Tue May 24 14:32:05 2022 +0800 Support ack response for Go SDK (#776) * Support ack response for Go SDK Signed-off-by: xiaolongran <xiaolong...@tencent.com> * add test case for this change Signed-off-by: xiaolongran <xiaolong...@tencent.com> --- pulsar/consumer.go | 7 +++++++ pulsar/consumer_impl.go | 1 + pulsar/consumer_partition.go | 11 +++++++++++ pulsar/consumer_test.go | 35 +++++++++++++++++++++++++++++++++++ pulsar/internal/connection.go | 23 +++++++++++++++++++++++ 5 files changed, 77 insertions(+) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index dfe27c5..2df1637 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -182,6 +182,13 @@ type ConsumerOptions struct { // > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)` // > because we are not able to get the redeliveryCount from the message ID. NackBackoffPolicy NackBackoffPolicy + + // AckWithResponse is a return value added to Ack Command, and its purpose is to confirm whether Ack Command + // is executed correctly on the Broker side. When set to true, the error information returned by the Ack + // method contains the return value of the Ack Command processed by the Broker side; when set to false, the + // error information of the Ack method only contains errors that may occur in the Go SDK's own processing. + // Default: false + AckWithResponse bool } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index e36f040..e887538 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -361,6 +361,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { keySharedPolicy: c.options.KeySharedPolicy, schema: c.options.Schema, decryption: c.options.Decryption, + ackWithResponse: c.options.AckWithResponse, } cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics) ch <- ConsumerError{ diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 30ffcb2..06ccfd5 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -104,6 +104,7 @@ type partitionConsumerOpts struct { keySharedPolicy *KeySharedPolicy schema Schema decryption *MessageDecryptionInfo + ackWithResponse bool } type partitionConsumer struct { @@ -525,12 +526,22 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { EntryId: proto.Uint64(uint64(msgID.entryID)), } + reqID := pc.client.rpcClient.NewRequestID() cmdAck := &pb.CommandAck{ ConsumerId: proto.Uint64(pc.consumerID), MessageId: messageIDs, AckType: pb.CommandAck_Individual.Enum(), } + if pc.options.ackWithResponse { + _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID, pb.BaseCommand_ACK, cmdAck) + if err != nil { + pc.log.WithError(err).Error("Ack with response error") + req.err = err + } + return + } + err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck) if err != nil { pc.log.Error("Connection was closed when request ack cmd") diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index cadd8e4..0366884 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1321,6 +1321,41 @@ func TestRLQ(t *testing.T) { assert.Nil(t, checkMsg) } +func TestAckWithResponse(t *testing.T) { + now := time.Now().Unix() + topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now) + ctx := context.Background() + + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic01, + SubscriptionName: "my-sub", + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + AckWithResponse: true, + }) + assert.Nil(t, err) + defer consumer.Close() + + producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01}) + assert.Nil(t, err) + defer producer01.Close() + for i := 0; i < 10; i++ { + _, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))}) + assert.Nil(t, err) + } + + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + err = consumer.Ack(msg) + assert.Nil(t, err) + } +} + func TestRLQMultiTopics(t *testing.T) { now := time.Now().Unix() topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index da9a901..fa8d055 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -564,6 +564,9 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl case pb.BaseCommand_MESSAGE: c.handleMessage(cmd.GetMessage(), headersAndPayload) + case pb.BaseCommand_ACK_RESPONSE: + c.handleAckResponse(cmd.GetAckResponse()) + case pb.BaseCommand_PING: c.handlePing() case pb.BaseCommand_PONG: @@ -676,6 +679,26 @@ func (c *connection) handleResponseError(serverError *pb.CommandError) { request.callback(nil, errors.New(errMsg)) } +func (c *connection) handleAckResponse(ackResponse *pb.CommandAckResponse) { + requestID := ackResponse.GetRequestId() + consumerID := ackResponse.GetConsumerId() + + request, ok := c.deletePendingRequest(requestID) + if !ok { + c.log.Warnf("AckResponse has complete when receive response! requestId : %d, consumerId : %d", + requestID, consumerID) + return + } + + if ackResponse.GetMessage() == "" { + request.callback(nil, nil) + return + } + + errMsg := fmt.Sprintf("ack response error: %s: %s", ackResponse.GetError(), ackResponse.GetMessage()) + request.callback(nil, errors.New(errMsg)) +} + func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) { producerID := response.GetProducerId()