This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c41616b  Support ack response for Go SDK (#776)
c41616b is described below

commit c41616b2f5125603fe252a90ca004bc0bd3d76d8
Author: xiaolong ran <xiaolong...@tencent.com>
AuthorDate: Tue May 24 14:32:05 2022 +0800

    Support ack response for Go SDK (#776)
    
    * Support ack response for Go SDK
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
    
    * add test case for this change
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
---
 pulsar/consumer.go            |  7 +++++++
 pulsar/consumer_impl.go       |  1 +
 pulsar/consumer_partition.go  | 11 +++++++++++
 pulsar/consumer_test.go       | 35 +++++++++++++++++++++++++++++++++++
 pulsar/internal/connection.go | 23 +++++++++++++++++++++++
 5 files changed, 77 insertions(+)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index dfe27c5..2df1637 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -182,6 +182,13 @@ type ConsumerOptions struct {
        // > Notice: the NackBackoffPolicy will not work with 
`consumer.NackID(MessageID)`
        // > because we are not able to get the redeliveryCount from the 
message ID.
        NackBackoffPolicy NackBackoffPolicy
+
+       // AckWithResponse is a return value added to Ack Command, and its 
purpose is to confirm whether Ack Command
+       // is executed correctly on the Broker side. When set to true, the 
error information returned by the Ack
+       // method contains the return value of the Ack Command processed by the 
Broker side; when set to false, the
+       // error information of the Ack method only contains errors that may 
occur in the Go SDK's own processing.
+       // Default: false
+       AckWithResponse bool
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e36f040..e887538 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -361,6 +361,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                keySharedPolicy:            
c.options.KeySharedPolicy,
                                schema:                     c.options.Schema,
                                decryption:                 
c.options.Decryption,
+                               ackWithResponse:            
c.options.AckWithResponse,
                        }
                        cons, err := newPartitionConsumer(c, c.client, opts, 
c.messageCh, c.dlq, c.metrics)
                        ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 30ffcb2..06ccfd5 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -104,6 +104,7 @@ type partitionConsumerOpts struct {
        keySharedPolicy            *KeySharedPolicy
        schema                     Schema
        decryption                 *MessageDecryptionInfo
+       ackWithResponse            bool
 }
 
 type partitionConsumer struct {
@@ -525,12 +526,22 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) 
{
                EntryId:  proto.Uint64(uint64(msgID.entryID)),
        }
 
+       reqID := pc.client.rpcClient.NewRequestID()
        cmdAck := &pb.CommandAck{
                ConsumerId: proto.Uint64(pc.consumerID),
                MessageId:  messageIDs,
                AckType:    pb.CommandAck_Individual.Enum(),
        }
 
+       if pc.options.ackWithResponse {
+               _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), 
reqID, pb.BaseCommand_ACK, cmdAck)
+               if err != nil {
+                       pc.log.WithError(err).Error("Ack with response error")
+                       req.err = err
+               }
+               return
+       }
+
        err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), 
pb.BaseCommand_ACK, cmdAck)
        if err != nil {
                pc.log.Error("Connection was closed when request ack cmd")
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index cadd8e4..0366884 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1321,6 +1321,41 @@ func TestRLQ(t *testing.T) {
        assert.Nil(t, checkMsg)
 }
 
+func TestAckWithResponse(t *testing.T) {
+       now := time.Now().Unix()
+       topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now)
+       ctx := context.Background()
+
+       client, err := NewClient(ClientOptions{URL: lookupURL})
+       assert.Nil(t, err)
+       defer client.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topic01,
+               SubscriptionName:            "my-sub",
+               Type:                        Shared,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+               AckWithResponse:             true,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       producer01, err := client.CreateProducer(ProducerOptions{Topic: 
topic01})
+       assert.Nil(t, err)
+       defer producer01.Close()
+       for i := 0; i < 10; i++ {
+               _, err = producer01.Send(ctx, &ProducerMessage{Payload: 
[]byte(fmt.Sprintf("MSG_01_%d", i))})
+               assert.Nil(t, err)
+       }
+
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+       }
+}
+
 func TestRLQMultiTopics(t *testing.T) {
        now := time.Now().Unix()
        topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index da9a901..fa8d055 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -564,6 +564,9 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
        case pb.BaseCommand_MESSAGE:
                c.handleMessage(cmd.GetMessage(), headersAndPayload)
 
+       case pb.BaseCommand_ACK_RESPONSE:
+               c.handleAckResponse(cmd.GetAckResponse())
+
        case pb.BaseCommand_PING:
                c.handlePing()
        case pb.BaseCommand_PONG:
@@ -676,6 +679,26 @@ func (c *connection) handleResponseError(serverError 
*pb.CommandError) {
        request.callback(nil, errors.New(errMsg))
 }
 
+func (c *connection) handleAckResponse(ackResponse *pb.CommandAckResponse) {
+       requestID := ackResponse.GetRequestId()
+       consumerID := ackResponse.GetConsumerId()
+
+       request, ok := c.deletePendingRequest(requestID)
+       if !ok {
+               c.log.Warnf("AckResponse has complete when receive response! 
requestId : %d, consumerId : %d",
+                       requestID, consumerID)
+               return
+       }
+
+       if ackResponse.GetMessage() == "" {
+               request.callback(nil, nil)
+               return
+       }
+
+       errMsg := fmt.Sprintf("ack response error: %s: %s", 
ackResponse.GetError(), ackResponse.GetMessage())
+       request.callback(nil, errors.New(errMsg))
+}
+
 func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
        producerID := response.GetProducerId()
 

Reply via email to