This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5108332  Add error response for Ack func (#775)
5108332 is described below

commit 5108332c9dd4cb454c26804304bffb82eeffc713
Author: xiaolong ran <xiaolong...@tencent.com>
AuthorDate: Mon May 23 11:15:27 2022 +0800

    Add error response for Ack func (#775)
    
    * Add error response for Ack func
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
    
    * when connection closed we need to reconnect by using new cnx
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
    
    * fix comments
    
    Signed-off-by: xiaolongran <xiaolong...@tencent.com>
---
 pulsar/consumer.go                                   |  4 ++--
 pulsar/consumer_impl.go                              | 20 ++++++++++----------
 pulsar/consumer_multitopic.go                        | 17 +++++++++--------
 pulsar/consumer_partition.go                         | 17 +++++++++++------
 pulsar/consumer_regex.go                             | 17 +++++++++--------
 pulsar/impl_message.go                               |  9 ++++++---
 pulsar/internal/connection_pool.go                   |  4 +++-
 .../pulsartracing/consumer_interceptor_test.go       |  8 ++++++--
 8 files changed, 56 insertions(+), 40 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c67509b..dfe27c5 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -200,10 +200,10 @@ type Consumer interface {
        Chan() <-chan ConsumerMessage
 
        // Ack the consumption of a single message
-       Ack(Message)
+       Ack(Message) error
 
        // AckID the consumption of a single message, identified by its 
MessageID
-       AckID(MessageID)
+       AckID(MessageID) error
 
        // ReconsumeLater mark a message for redelivery after custom delay
        ReconsumeLater(msg Message, delay time.Duration)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 2bd3ed5..e36f040 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "context"
+       "errors"
        "fmt"
        "math/rand"
        "strconv"
@@ -34,7 +35,7 @@ import (
 const defaultNackRedeliveryDelay = 1 * time.Minute
 
 type acker interface {
-       AckID(id trackingMessageID)
+       AckID(id trackingMessageID) error
        NackID(id trackingMessageID)
        NackMsg(msg Message)
 }
@@ -438,29 +439,28 @@ func (c *consumer) Receive(ctx context.Context) (message 
Message, err error) {
        }
 }
 
-// Messages
+// Chan return the message chan to users
 func (c *consumer) Chan() <-chan ConsumerMessage {
        return c.messageCh
 }
 
 // Ack the consumption of a single message
-func (c *consumer) Ack(msg Message) {
-       c.AckID(msg.ID())
+func (c *consumer) Ack(msg Message) error {
+       return c.AckID(msg.ID())
 }
 
-// Ack the consumption of a single message, identified by its MessageID
-func (c *consumer) AckID(msgID MessageID) {
+// AckID the consumption of a single message, identified by its MessageID
+func (c *consumer) AckID(msgID MessageID) error {
        mid, ok := c.messageID(msgID)
        if !ok {
-               return
+               return errors.New("failed to convert trackingMessageID")
        }
 
        if mid.consumer != nil {
-               mid.Ack()
-               return
+               return mid.Ack()
        }
 
-       c.consumers[mid.partitionIdx].AckID(mid)
+       return c.consumers[mid.partitionIdx].AckID(mid)
 }
 
 // ReconsumeLater mark a message for redelivery after custom delay
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index c1cb3d8..1d75a24 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "context"
+       "errors"
        "fmt"
        "sync"
        "time"
@@ -112,30 +113,30 @@ func (c *multiTopicConsumer) Receive(ctx context.Context) 
(message Message, err
        }
 }
 
-// Messages
+// Chan return the message chan to users
 func (c *multiTopicConsumer) Chan() <-chan ConsumerMessage {
        return c.messageCh
 }
 
 // Ack the consumption of a single message
-func (c *multiTopicConsumer) Ack(msg Message) {
-       c.AckID(msg.ID())
+func (c *multiTopicConsumer) Ack(msg Message) error {
+       return c.AckID(msg.ID())
 }
 
-// Ack the consumption of a single message, identified by its MessageID
-func (c *multiTopicConsumer) AckID(msgID MessageID) {
+// AckID the consumption of a single message, identified by its MessageID
+func (c *multiTopicConsumer) AckID(msgID MessageID) error {
        mid, ok := toTrackingMessageID(msgID)
        if !ok {
                c.log.Warnf("invalid message id type %T", msgID)
-               return
+               return errors.New("invalid message id type in multi_consumer")
        }
 
        if mid.consumer == nil {
                c.log.Warnf("unable to ack messageID=%+v can not determine 
topic", msgID)
-               return
+               return errors.New("unable to ack message because consumer is 
nil")
        }
 
-       mid.Ack()
+       return mid.Ack()
 }
 
 func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index db2994f..30ffcb2 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -317,21 +317,24 @@ func (pc *partitionConsumer) requestGetLastMessageID() 
(trackingMessageID, error
        return convertToMessageID(id), nil
 }
 
-func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
+func (pc *partitionConsumer) AckID(msgID trackingMessageID) error {
        if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
                pc.log.WithField("state", state).Error("Failed to ack by 
closing or closed consumer")
-               return
+               return errors.New("consumer state is closed")
        }
+
+       ackReq := new(ackRequest)
        if !msgID.Undefined() && msgID.ack() {
                pc.metrics.AcksCounter.Inc()
                
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano())
 / 1.0e9)
-               req := &ackRequest{
-                       msgID: msgID,
-               }
-               pc.eventsCh <- req
+               ackReq.msgID = msgID
+               // send ack request to eventsCh
+               pc.eventsCh <- ackReq
 
                pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
        }
+
+       return ackReq.err
 }
 
 func (pc *partitionConsumer) NackID(msgID trackingMessageID) {
@@ -531,6 +534,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
        err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), 
pb.BaseCommand_ACK, cmdAck)
        if err != nil {
                pc.log.Error("Connection was closed when request ack cmd")
+               req.err = err
        }
 }
 
@@ -919,6 +923,7 @@ func (pc *partitionConsumer) dispatcher() {
 
 type ackRequest struct {
        msgID trackingMessageID
+       err   error
 }
 
 type unsubscribeRequest struct {
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index ed2ae1a..e4d2077 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "context"
+       "errors"
        "fmt"
        "regexp"
        "strings"
@@ -152,34 +153,34 @@ func (c *regexConsumer) Receive(ctx context.Context) 
(message Message, err error
        }
 }
 
-// Chan
+// Chan return the messages chan to user
 func (c *regexConsumer) Chan() <-chan ConsumerMessage {
        return c.messageCh
 }
 
 // Ack the consumption of a single message
-func (c *regexConsumer) Ack(msg Message) {
-       c.AckID(msg.ID())
+func (c *regexConsumer) Ack(msg Message) error {
+       return c.AckID(msg.ID())
 }
 
 func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
        c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
 }
 
-// Ack the consumption of a single message, identified by its MessageID
-func (c *regexConsumer) AckID(msgID MessageID) {
+// AckID the consumption of a single message, identified by its MessageID
+func (c *regexConsumer) AckID(msgID MessageID) error {
        mid, ok := toTrackingMessageID(msgID)
        if !ok {
                c.log.Warnf("invalid message id type %T", msgID)
-               return
+               return errors.New("invalid message id type")
        }
 
        if mid.consumer == nil {
                c.log.Warnf("unable to ack messageID=%+v can not determine 
topic", msgID)
-               return
+               return errors.New("consumer is nil in consumer_regex")
        }
 
-       mid.Ack()
+       return mid.Ack()
 }
 
 func (c *regexConsumer) Nack(msg Message) {
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 3216676..8248b1a 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+       "errors"
        "fmt"
        "math"
        "math/big"
@@ -63,13 +64,15 @@ func (id trackingMessageID) Undefined() bool {
        return id == trackingMessageID{}
 }
 
-func (id trackingMessageID) Ack() {
+func (id trackingMessageID) Ack() error {
        if id.consumer == nil {
-               return
+               return errors.New("consumer is nil in trackingMessageID")
        }
        if id.ack() {
-               id.consumer.AckID(id)
+               return id.consumer.AckID(id)
        }
+
+       return nil
 }
 
 func (id trackingMessageID) Nack() {
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index db67c25..6491abd 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -78,7 +78,9 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, 
physicalAddr *url.U
                p.log.Debugf("Found connection in pool key=%s logical_addr=%+v 
physical_addr=%+v",
                        key, conn.logicalAddr, conn.physicalAddr)
 
-               // remove stale/failed connection
+               // When the current connection is in a closed state or the 
broker actively notifies that the
+               // current connection is closed, we need to remove the 
connection object from the current
+               // connection pool and create a new connection.
                if conn.closed() {
                        p.log.Infof("Removed connection from pool key=%s 
logical_addr=%+v physical_addr=%+v",
                                key, conn.logicalAddr, conn.physicalAddr)
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go 
b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
index b15a926..9e70d8b 100644
--- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
@@ -64,9 +64,13 @@ func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage {
        return nil
 }
 
-func (c *mockConsumer) Ack(msg pulsar.Message) {}
+func (c *mockConsumer) Ack(msg pulsar.Message) error {
+       return nil
+}
 
-func (c *mockConsumer) AckID(msgID pulsar.MessageID) {}
+func (c *mockConsumer) AckID(msgID pulsar.MessageID) error {
+       return nil
+}
 
 func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) 
{}
 

Reply via email to