This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 5108332 Add error response for Ack func (#775) 5108332 is described below commit 5108332c9dd4cb454c26804304bffb82eeffc713 Author: xiaolong ran <xiaolong...@tencent.com> AuthorDate: Mon May 23 11:15:27 2022 +0800 Add error response for Ack func (#775) * Add error response for Ack func Signed-off-by: xiaolongran <xiaolong...@tencent.com> * when connection closed we need to reconnect by using new cnx Signed-off-by: xiaolongran <xiaolong...@tencent.com> * fix comments Signed-off-by: xiaolongran <xiaolong...@tencent.com> --- pulsar/consumer.go | 4 ++-- pulsar/consumer_impl.go | 20 ++++++++++---------- pulsar/consumer_multitopic.go | 17 +++++++++-------- pulsar/consumer_partition.go | 17 +++++++++++------ pulsar/consumer_regex.go | 17 +++++++++-------- pulsar/impl_message.go | 9 ++++++--- pulsar/internal/connection_pool.go | 4 +++- .../pulsartracing/consumer_interceptor_test.go | 8 ++++++-- 8 files changed, 56 insertions(+), 40 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index c67509b..dfe27c5 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -200,10 +200,10 @@ type Consumer interface { Chan() <-chan ConsumerMessage // Ack the consumption of a single message - Ack(Message) + Ack(Message) error // AckID the consumption of a single message, identified by its MessageID - AckID(MessageID) + AckID(MessageID) error // ReconsumeLater mark a message for redelivery after custom delay ReconsumeLater(msg Message, delay time.Duration) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 2bd3ed5..e36f040 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "errors" "fmt" "math/rand" "strconv" @@ -34,7 +35,7 @@ import ( const defaultNackRedeliveryDelay = 1 * time.Minute type acker interface { - AckID(id trackingMessageID) + AckID(id trackingMessageID) error NackID(id trackingMessageID) NackMsg(msg Message) } @@ -438,29 +439,28 @@ func (c *consumer) Receive(ctx context.Context) (message Message, err error) { } } -// Messages +// Chan return the message chan to users func (c *consumer) Chan() <-chan ConsumerMessage { return c.messageCh } // Ack the consumption of a single message -func (c *consumer) Ack(msg Message) { - c.AckID(msg.ID()) +func (c *consumer) Ack(msg Message) error { + return c.AckID(msg.ID()) } -// Ack the consumption of a single message, identified by its MessageID -func (c *consumer) AckID(msgID MessageID) { +// AckID the consumption of a single message, identified by its MessageID +func (c *consumer) AckID(msgID MessageID) error { mid, ok := c.messageID(msgID) if !ok { - return + return errors.New("failed to convert trackingMessageID") } if mid.consumer != nil { - mid.Ack() - return + return mid.Ack() } - c.consumers[mid.partitionIdx].AckID(mid) + return c.consumers[mid.partitionIdx].AckID(mid) } // ReconsumeLater mark a message for redelivery after custom delay diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index c1cb3d8..1d75a24 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "errors" "fmt" "sync" "time" @@ -112,30 +113,30 @@ func (c *multiTopicConsumer) Receive(ctx context.Context) (message Message, err } } -// Messages +// Chan return the message chan to users func (c *multiTopicConsumer) Chan() <-chan ConsumerMessage { return c.messageCh } // Ack the consumption of a single message -func (c *multiTopicConsumer) Ack(msg Message) { - c.AckID(msg.ID()) +func (c *multiTopicConsumer) Ack(msg Message) error { + return c.AckID(msg.ID()) } -// Ack the consumption of a single message, identified by its MessageID -func (c *multiTopicConsumer) AckID(msgID MessageID) { +// AckID the consumption of a single message, identified by its MessageID +func (c *multiTopicConsumer) AckID(msgID MessageID) error { mid, ok := toTrackingMessageID(msgID) if !ok { c.log.Warnf("invalid message id type %T", msgID) - return + return errors.New("invalid message id type in multi_consumer") } if mid.consumer == nil { c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID) - return + return errors.New("unable to ack message because consumer is nil") } - mid.Ack() + return mid.Ack() } func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) { diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index db2994f..30ffcb2 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -317,21 +317,24 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error return convertToMessageID(id), nil } -func (pc *partitionConsumer) AckID(msgID trackingMessageID) { +func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") - return + return errors.New("consumer state is closed") } + + ackReq := new(ackRequest) if !msgID.Undefined() && msgID.ack() { pc.metrics.AcksCounter.Inc() pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9) - req := &ackRequest{ - msgID: msgID, - } - pc.eventsCh <- req + ackReq.msgID = msgID + // send ack request to eventsCh + pc.eventsCh <- ackReq pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) } + + return ackReq.err } func (pc *partitionConsumer) NackID(msgID trackingMessageID) { @@ -531,6 +534,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck) if err != nil { pc.log.Error("Connection was closed when request ack cmd") + req.err = err } } @@ -919,6 +923,7 @@ func (pc *partitionConsumer) dispatcher() { type ackRequest struct { msgID trackingMessageID + err error } type unsubscribeRequest struct { diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index ed2ae1a..e4d2077 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "errors" "fmt" "regexp" "strings" @@ -152,34 +153,34 @@ func (c *regexConsumer) Receive(ctx context.Context) (message Message, err error } } -// Chan +// Chan return the messages chan to user func (c *regexConsumer) Chan() <-chan ConsumerMessage { return c.messageCh } // Ack the consumption of a single message -func (c *regexConsumer) Ack(msg Message) { - c.AckID(msg.ID()) +func (c *regexConsumer) Ack(msg Message) error { + return c.AckID(msg.ID()) } func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) { c.log.Warnf("regexp consumer not support ReconsumeLater yet.") } -// Ack the consumption of a single message, identified by its MessageID -func (c *regexConsumer) AckID(msgID MessageID) { +// AckID the consumption of a single message, identified by its MessageID +func (c *regexConsumer) AckID(msgID MessageID) error { mid, ok := toTrackingMessageID(msgID) if !ok { c.log.Warnf("invalid message id type %T", msgID) - return + return errors.New("invalid message id type") } if mid.consumer == nil { c.log.Warnf("unable to ack messageID=%+v can not determine topic", msgID) - return + return errors.New("consumer is nil in consumer_regex") } - mid.Ack() + return mid.Ack() } func (c *regexConsumer) Nack(msg Message) { diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 3216676..8248b1a 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -18,6 +18,7 @@ package pulsar import ( + "errors" "fmt" "math" "math/big" @@ -63,13 +64,15 @@ func (id trackingMessageID) Undefined() bool { return id == trackingMessageID{} } -func (id trackingMessageID) Ack() { +func (id trackingMessageID) Ack() error { if id.consumer == nil { - return + return errors.New("consumer is nil in trackingMessageID") } if id.ack() { - id.consumer.AckID(id) + return id.consumer.AckID(id) } + + return nil } func (id trackingMessageID) Nack() { diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index db67c25..6491abd 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -78,7 +78,9 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U p.log.Debugf("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) - // remove stale/failed connection + // When the current connection is in a closed state or the broker actively notifies that the + // current connection is closed, we need to remove the connection object from the current + // connection pool and create a new connection. if conn.closed() { p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go index b15a926..9e70d8b 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -64,9 +64,13 @@ func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage { return nil } -func (c *mockConsumer) Ack(msg pulsar.Message) {} +func (c *mockConsumer) Ack(msg pulsar.Message) error { + return nil +} -func (c *mockConsumer) AckID(msgID pulsar.MessageID) {} +func (c *mockConsumer) AckID(msgID pulsar.MessageID) error { + return nil +} func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) {}