This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 988702e  [bugfix] AMQP consumers are not living up to expectations 
(#210)
988702e is described below

commit 988702ef41b9d9075ff060b5bba54d97a94fab9c
Author: renyansongno1 <[email protected]>
AuthorDate: Wed Dec 18 14:12:56 2024 +0800

    [bugfix] AMQP consumers are not living up to expectations (#210)
---
 plugins/amqp/consumer.go                           | 29 +++++++-
 plugins/amqp/consumer_with_ctx.go                  | 37 ---------
 plugins/amqp/general_consumer.go                   | 87 ++++++++++++++++------
 plugins/amqp/instrument.go                         | 33 ++++----
 plugins/amqp/structures.go                         | 27 +++++++
 test/plugins/scenarios/amqp/config/excepted.yml    | 40 +++++-----
 test/plugins/scenarios/amqp/main.go                | 84 +++++++--------------
 .../go-agent/instrument/plugins/rewrite/context.go |  2 +-
 8 files changed, 184 insertions(+), 155 deletions(-)

diff --git a/plugins/amqp/consumer.go b/plugins/amqp/consumer.go
index 0bbb335..52c6e6e 100644
--- a/plugins/amqp/consumer.go
+++ b/plugins/amqp/consumer.go
@@ -23,15 +23,36 @@ import (
        "github.com/apache/skywalking-go/plugins/core/operator"
 )
 
+type ConsumersSendInterceptor struct {
+}
+
+func (c *ConsumersSendInterceptor) BeforeInvoke(invocation 
operator.Invocation) error {
+       return nil
+}
+
+func (c *ConsumersSendInterceptor) AfterInvoke(invocation operator.Invocation, 
results ...interface{}) error {
+       return GeneralConsumersSendAfterInvoke(invocation, results...)
+}
+
 type ConsumerInterceptor struct {
 }
 
 func (c *ConsumerInterceptor) BeforeInvoke(invocation operator.Invocation) 
error {
+       args := invocation.Args()[6].(amqp091.Table)
+       return GeneralConsumerBeforeInvoke(invocation, args)
+}
+
+func (c *ConsumerInterceptor) AfterInvoke(operator.Invocation, ...interface{}) 
error {
        return nil
 }
 
-func (c *ConsumerInterceptor) AfterInvoke(invocation operator.Invocation, 
results ...interface{}) error {
-       queue, consumerTag, args := invocation.Args()[0].(string), 
invocation.Args()[1].(string),
-               invocation.Args()[6].(amqp091.Table)
-       return GeneralConsumerAfterInvoke(invocation, queue, consumerTag, args, 
results...)
+type ConsumersCloseInterceptor struct {
+}
+
+func (c *ConsumersCloseInterceptor) BeforeInvoke(invocation 
operator.Invocation) error {
+       return GeneralConsumerCloseBeforeInvoke(invocation)
+}
+
+func (c *ConsumersCloseInterceptor) AfterInvoke(operator.Invocation, 
...interface{}) error {
+       return nil
 }
diff --git a/plugins/amqp/consumer_with_ctx.go 
b/plugins/amqp/consumer_with_ctx.go
deleted file mode 100644
index f7993f2..0000000
--- a/plugins/amqp/consumer_with_ctx.go
+++ /dev/null
@@ -1,37 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package amqp
-
-import (
-       "github.com/rabbitmq/amqp091-go"
-
-       "github.com/apache/skywalking-go/plugins/core/operator"
-)
-
-type ConsumerWithCtxInterceptor struct {
-}
-
-func (cwi *ConsumerWithCtxInterceptor) BeforeInvoke(invocation 
operator.Invocation) error {
-       return nil
-}
-
-func (cwi *ConsumerWithCtxInterceptor) AfterInvoke(invocation 
operator.Invocation, results ...interface{}) error {
-       queue, consumerTag, args := invocation.Args()[1].(string), 
invocation.Args()[2].(string),
-               invocation.Args()[7].(amqp091.Table)
-       return GeneralConsumerAfterInvoke(invocation, queue, consumerTag, args, 
results...)
-}
diff --git a/plugins/amqp/general_consumer.go b/plugins/amqp/general_consumer.go
index d4c9087..89c4cc8 100644
--- a/plugins/amqp/general_consumer.go
+++ b/plugins/amqp/general_consumer.go
@@ -19,6 +19,9 @@ package amqp
 
 import (
        "fmt"
+       "os"
+       "strconv"
+       "sync/atomic"
 
        "github.com/rabbitmq/amqp091-go"
 
@@ -27,44 +30,82 @@ import (
 )
 
 const (
-       ConsumerComponentID = 145
-       amqpConsumerPrefix  = "AMQP/"
-       amqpConsumerSuffix  = "/Consumer"
-       tagMQConsumerTag    = "mq.consumer_tag"
-       tagMQReplyTo        = "mq.reply_to"
-       tagMQCorrelationID  = "mq.correlation_id"
-       tagMQArgs           = "mq.args"
+       ConsumerComponentID  = 145
+       amqpConsumerPrefix   = "AMQP/"
+       amqpConsumerSuffix   = "/Consumer"
+       tagMQConsumerTag     = "mq.consumer_tag"
+       tagMQReplyTo         = "mq.reply_to"
+       tagMQCorrelationID   = "mq.correlation_id"
+       tagMQArgs            = "mq.args"
+       consumerTagLengthMax = 0xFF
 )
 
-func GeneralConsumerAfterInvoke(invocation operator.Invocation, queue, 
consumerTag string, args amqp091.Table, results ...interface{}) error {
-       deliveries := <-results[0].(<-chan Delivery)
-       if consumerTag == "" {
-               consumerTag = deliveries.ConsumerTag
-       }
-       operationName := amqpConsumerPrefix + queue + "/" + consumerTag + 
amqpConsumerSuffix
+var consumerSeq uint64
+var queueConsumerTagMapping = make(map[string]string)
 
-       channel := invocation.CallerInstance().(*nativeChannel)
+func GeneralConsumersSendAfterInvoke(invocation operator.Invocation, results 
...interface{}) error {
+       if foundConsumer := results[0].(bool); !foundConsumer {
+               return nil
+       }
+       consumerTag, _ := invocation.Args()[0].(string)
+       delivery, _ := invocation.Args()[1].(*Delivery)
+       operationName := amqpConsumerPrefix + 
queueConsumerTagMapping[consumerTag] + "/" + consumerTag + amqpConsumerSuffix
+       channel, _ := delivery.Acknowledger.(*nativeChannel)
        peer := getPeerInfo(channel.connection)
 
        span, err := tracing.CreateEntrySpan(operationName, func(headerKey 
string) (string, error) {
-               return deliveries.Headers[headerKey].(string), nil
+               header, _ := delivery.Headers[headerKey].(string)
+               return header, nil
        }, tracing.WithLayer(tracing.SpanLayerMQ),
                tracing.WithComponent(ConsumerComponentID),
                tracing.WithTag(tracing.TagMQBroker, peer),
-               tracing.WithTag(tracing.TagMQQueue, queue),
-               tracing.WithTag(tracing.TagMQMsgID, deliveries.MessageId),
+               tracing.WithTag(tracing.TagMQQueue, 
queueConsumerTagMapping[consumerTag]),
+               tracing.WithTag(tracing.TagMQMsgID, delivery.MessageId),
                tracing.WithTag(tagMQConsumerTag, consumerTag),
-               tracing.WithTag(tagMQCorrelationID, deliveries.CorrelationId),
-               tracing.WithTag(tagMQReplyTo, deliveries.ReplyTo),
-               tracing.WithTag(tagMQArgs, fmt.Sprintf("%v", args)),
+               tracing.WithTag(tagMQCorrelationID, delivery.CorrelationId),
+               tracing.WithTag(tagMQReplyTo, delivery.ReplyTo),
+               tracing.WithTag(tagMQArgs, fmt.Sprintf("%v", delivery.Headers)),
        )
        if err != nil {
                return err
        }
        span.SetPeer(peer)
-       if err, ok := results[1].(error); ok && err != nil {
-               span.Error(err.Error())
-       }
        span.End()
        return nil
 }
+
+func GeneralConsumerBeforeInvoke(invocation operator.Invocation, args 
amqp091.Table) error {
+       queue := invocation.Args()[0].(string)
+       consumerTag := invocation.Args()[1].(string)
+       if consumerTag == "" {
+               consumerTag = uniqueConsumerTag()
+       }
+       queueConsumerTagMapping[consumerTag] = queue
+       return nil
+}
+
+func GeneralConsumerCloseBeforeInvoke(invocation operator.Invocation) error {
+       consumers, _ := invocation.CallerInstance().(*nativeConsumers)
+       consumers.Lock()
+       defer consumers.Unlock()
+       for consumerTag := range consumers.chans {
+               delete(queueConsumerTagMapping, consumerTag)
+       }
+       return nil
+}
+
+func uniqueConsumerTag() string {
+       return commandNameBasedUniqueConsumerTag(os.Args[0])
+}
+
+func commandNameBasedUniqueConsumerTag(commandName string) string {
+       tagPrefix := "ctag-"
+       tagInfix := commandName
+       tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 
1), 10)
+
+       if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax {
+               tagInfix = "streadway/amqp"
+       }
+
+       return tagPrefix + tagInfix + tagSuffix
+}
diff --git a/plugins/amqp/instrument.go b/plugins/amqp/instrument.go
index 3ad747d..fdb6478 100644
--- a/plugins/amqp/instrument.go
+++ b/plugins/amqp/instrument.go
@@ -65,30 +65,37 @@ func (i *Instrument) Points() []*instrument.Point {
                {
                        PackagePath: "",
                        PackageName: "amqp091",
-                       At: instrument.NewMethodEnhance("*Channel", "Consume",
-                               instrument.WithArgsCount(7),
+                       At: instrument.NewMethodEnhance("*consumers", "send",
+                               instrument.WithArgsCount(2),
                                instrument.WithArgType(0, "string"),
-                               instrument.WithArgType(1, "string"),
-                               instrument.WithArgType(6, "Table"),
-                               instrument.WithResultCount(2),
-                               instrument.WithResultType(0, "<-chan Delivery"),
-                               instrument.WithResultType(1, "error"),
+                               instrument.WithArgType(1, "*Delivery"),
+                               instrument.WithResultCount(1),
+                               instrument.WithResultType(0, "bool"),
                        ),
-                       Interceptor: "ConsumerInterceptor",
+                       Interceptor: "ConsumersSendInterceptor",
+               },
+               {
+                       PackagePath: "",
+                       PackageName: "amqp091",
+                       At: instrument.NewMethodEnhance("*consumers", "close",
+                               instrument.WithArgsCount(0),
+                               instrument.WithResultCount(0),
+                       ),
+                       Interceptor: "ConsumersCloseInterceptor",
                },
                {
                        PackagePath: "",
                        PackageName: "amqp091",
-                       At: instrument.NewMethodEnhance("*Channel", 
"ConsumeWithContext",
-                               instrument.WithArgsCount(8),
+                       At: instrument.NewMethodEnhance("*Channel", "Consume",
+                               instrument.WithArgsCount(7),
+                               instrument.WithArgType(0, "string"),
                                instrument.WithArgType(1, "string"),
-                               instrument.WithArgType(2, "string"),
-                               instrument.WithArgType(7, "Table"),
+                               instrument.WithArgType(6, "Table"),
                                instrument.WithResultCount(2),
                                instrument.WithResultType(0, "<-chan Delivery"),
                                instrument.WithResultType(1, "error"),
                        ),
-                       Interceptor: "ConsumerWithCtxInterceptor",
+                       Interceptor: "ConsumerInterceptor",
                },
                {
                        PackagePath: "",
diff --git a/plugins/amqp/structures.go b/plugins/amqp/structures.go
index 5675b6c..d25f181 100644
--- a/plugins/amqp/structures.go
+++ b/plugins/amqp/structures.go
@@ -19,6 +19,7 @@ package amqp
 
 import (
        "io"
+       "sync"
 )
 
 //skywalking:native github.com/rabbitmq/amqp091-go Channel
@@ -26,8 +27,19 @@ type nativeChannel struct {
        connection *nativeConnection
 }
 
+func (ch *nativeChannel) Ack(tag uint64, multiple bool) error {
+       return nil
+}
+func (ch *nativeChannel) Nack(tag uint64, multiple, requeue bool) error {
+       return nil
+}
+func (ch *nativeChannel) Reject(tag uint64, requeue bool) error {
+       return nil
+}
+
 //skywalking:native github.com/rabbitmq/amqp091-go Delivery
 type Delivery struct {
+       Acknowledger  nativeAcknowledger
        Headers       Table
        MessageId     string //nolint
        ConsumerTag   string
@@ -44,3 +56,18 @@ type Table map[string]interface{}
 type nativeConnection struct {
        conn io.ReadWriteCloser
 }
+
+//skywalking:native github.com/rabbitmq/amqp091-go Acknowledger
+type nativeAcknowledger interface {
+       Ack(tag uint64, multiple bool) error
+       Nack(tag uint64, multiple bool, requeue bool) error
+       Reject(tag uint64, requeue bool) error
+}
+
+//skywalking:native github.com/rabbitmq/amqp091-go consumers
+type nativeConsumers struct {
+       sync.Mutex
+       chans consumerBuffers
+}
+
+type consumerBuffers map[string]chan *Delivery
diff --git a/test/plugins/scenarios/amqp/config/excepted.yml 
b/test/plugins/scenarios/amqp/config/excepted.yml
index 7f86061..a04a05e 100644
--- a/test/plugins/scenarios/amqp/config/excepted.yml
+++ b/test/plugins/scenarios/amqp/config/excepted.yml
@@ -35,12 +35,27 @@ segmentItems:
               - { key: mq.broker, value: 'amqp-server:5672' }
               - { key: mq.exchange, value: not null }
               - { key: mq.routing_key, value: sw-queue-1 }
-          - operationName: AMQP/sw-queue-1/sw-consumer-1/Consumer
+          - operationName: AMQP/sw-queue-1/Producer
             parentSpanId: 0
             spanId: 2
             spanLayer: MQ
             startTime: nq 0
             endTime: nq 0
+            componentId: 144
+            isError: false
+            spanType: Exit
+            peer: amqp-server:5672
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: 'amqp-server:5672' }
+              - { key: mq.exchange, value: not null }
+              - { key: mq.routing_key, value: sw-queue-1 }
+          - operationName: AMQP/sw-queue-1/sw-consumer-1/Consumer
+            parentSpanId: 0
+            spanId: 3
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
             componentId: 145
             isError: false
             spanType: Entry
@@ -59,22 +74,7 @@ segmentItems:
                   parentSpanId: 1, parentTraceSegmentId: not null,
                   parentServiceInstance: not null, parentService: amqp,
                   traceId: not null }
-          - operationName: AMQP/sw-queue-2/Producer
-            parentSpanId: 0
-            spanId: 3
-            spanLayer: MQ
-            startTime: nq 0
-            endTime: nq 0
-            componentId: 144
-            isError: false
-            spanType: Exit
-            peer: amqp-server:5672
-            skipAnalysis: false
-            tags:
-              - { key: mq.broker, value: 'amqp-server:5672' }
-              - { key: mq.exchange, value: not null }
-              - { key: mq.routing_key, value: sw-queue-2 }
-          - operationName: AMQP/sw-queue-2/sw-consumer-2/Consumer
+          - operationName: AMQP/sw-queue-1/sw-consumer-1/Consumer
             parentSpanId: 0
             spanId: 4
             spanLayer: MQ
@@ -87,15 +87,15 @@ segmentItems:
             skipAnalysis: false
             tags:
               - { key: mq.broker, value: 'amqp-server:5672' }
-              - { key: mq.queue, value: sw-queue-2 }
+              - { key: mq.queue, value: sw-queue-1 }
               - { key: mq.msg.id, value: not null }
-              - { key: mq.consumer_tag, value: sw-consumer-2 }
+              - { key: mq.consumer_tag, value: sw-consumer-1 }
               - { key: mq.correlation_id, value: not null }
               - { key: mq.reply_to, value: not null }
               - { key: mq.args, value: not null }
             refs:
               - { parentEndpoint: 'GET:/execute', networkAddress: 
'amqp-server:5672', refType: CrossProcess,
-                  parentSpanId: 3, parentTraceSegmentId: not null,
+                  parentSpanId: 2, parentTraceSegmentId: not null,
                   parentServiceInstance: not null, parentService: amqp,
                   traceId: not null }
           - operationName: GET:/execute
diff --git a/test/plugins/scenarios/amqp/main.go 
b/test/plugins/scenarios/amqp/main.go
index b840c0e..71e5584 100644
--- a/test/plugins/scenarios/amqp/main.go
+++ b/test/plugins/scenarios/amqp/main.go
@@ -32,14 +32,11 @@ import (
 type testFunc func(RabbitClient) error
 
 var (
-       uri                    = "amqp://admin:123456@amqp-server:5672"
-       queue1                 = "sw-queue-1"
-       queue2                 = "sw-queue-2"
-       body                   = "I love skywalking 3 thousand"
-       consumerTag1           = "sw-consumer-1"
-       consumerTag2           = "sw-consumer-2"
-       consumerTrigger        = make(chan struct{})
-       consumerWithCtxTrigger = make(chan struct{})
+       uri             = "amqp://admin:123456@amqp-server:5672"
+       queue           = "sw-queue-1"
+       body            = "I love skywalking 3 thousand"
+       consumerTag     = "sw-consumer-1"
+       consumerTrigger = make(chan struct{})
 )
 
 func main() {
@@ -59,7 +56,6 @@ func main() {
                        fn   testFunc
                }{
                        {"testSimpleConsumer", testSimpleConsumer},
-                       {"testConsumerWithCtx", testConsumerWithCtx},
                }
                for _, test := range tests {
                        fmt.Printf("excute test case: %s\n", test.name)
@@ -80,32 +76,36 @@ func main() {
 }
 
 func testSimpleConsumer(client RabbitClient) error {
-       producer(queue1, client)
+       producer(queue, client)
        go consumer()
        consumerTrigger <- struct{}{}
        time.Sleep(time.Second)
        return nil
 }
 
-func testConsumerWithCtx(client RabbitClient) error {
-       producer(queue2, client)
-       go consumerWithContext()
-       consumerWithCtxTrigger <- struct{}{}
-       time.Sleep(time.Second)
-       return nil
-}
-
 func producer(queue string, client RabbitClient) {
-       client.CreateQueue(queue, true, false)
-       if err := client.Send(context.Background(), "", queue, amqp.Publishing{
+       _, err := client.CreateQueue(queue, true, false)
+       if err != nil {
+               fmt.Println("Failed to Create Queue, err: ", err)
+       }
+       if err = client.Send(context.Background(), "", queue, amqp.Publishing{
                ContentType:   "text/plain",
                Body:          []byte(body),
                Headers:       amqp.Table{},
                CorrelationId: "1",
-               MessageId:     "2",
+               MessageId:     "1",
        }); err != nil {
                fmt.Println("Failed to Send msg, err: ", err)
        }
+       if err = client.Send(context.Background(), "", queue, amqp.Publishing{
+               ContentType:   "text/plain",
+               Body:          []byte(body),
+               Headers:       amqp.Table{},
+               CorrelationId: "2",
+               MessageId:     "2",
+       }); err != nil {
+               fmt.Println("Failed to Send msg second time, err: ", err)
+       }
 }
 
 func consumer() {
@@ -118,16 +118,19 @@ func consumer() {
        if err != nil {
                fmt.Println("Failed to Channel Consume, err: ", err)
        }
-       msgs, err := consumeClient.Consume(queue1, consumerTag1, false)
+       msgs, err := consumeClient.Consume(queue, consumerTag, false)
        if err != nil {
                fmt.Println("Failed to Consume msg, err: ", err)
        }
        log.Printf("[Consumer] Waiting for messages.\n")
        for d := range msgs {
                log.Printf("Received a message: %s\n", string(d.Body))
-               d.Ack(false)
+               err = d.Ack(false)
+               if err != nil {
+                       fmt.Println("Failed to ACK msg, err: ", err)
+               }
        }
-       err = consumeClient.Cancel(consumerTag1)
+       err = consumeClient.Cancel(consumerTag)
        if err != nil {
                fmt.Println("Failed to Cancel Consume, err: ", err)
        }
@@ -137,35 +140,6 @@ func consumer() {
        }
 }
 
-func consumerWithContext() {
-       <-consumerWithCtxTrigger
-       consumeConn, err := amqp.Dial(uri)
-       if err != nil {
-               fmt.Println("Failed to Dial ConsumerWithContext, err: ", err)
-       }
-       consumeClient, err := NewRabbitMQClient(consumeConn)
-       if err != nil {
-               fmt.Println("Failed to Channel ConsumerWithContext, err: ", err)
-       }
-       msgs, err := consumeClient.Consume(queue2, consumerTag2, false)
-       if err != nil {
-               fmt.Println("Failed to Consume msg, err: ", err)
-       }
-       log.Printf("[ConsumerWithContext] Waiting for messages.\n")
-       for d := range msgs {
-               log.Printf("Received a message: %s", string(d.Body))
-               d.Ack(false)
-       }
-       err = consumeClient.Cancel(consumerTag2)
-       if err != nil {
-               fmt.Println("Failed to Cancel ConsumerWithContext, err: ", err)
-       }
-       err = consumeConn.Close()
-       if err != nil {
-               fmt.Println("Failed to Close ConsumerWithContext, err: ", err)
-       }
-}
-
 // RabbitClient is used to keep track of the RabbitMQ connection
 type RabbitClient struct {
        // The connection that is used
@@ -233,7 +207,3 @@ func (rc RabbitClient) Send(ctx context.Context, exchange, 
routingKey string, op
 func (rc RabbitClient) Consume(queue, consumer string, autoAck bool) (<-chan 
amqp.Delivery, error) {
        return rc.ch.Consume(queue, consumer, autoAck, false, false, false, nil)
 }
-
-func (rc RabbitClient) ConsumeWithContext(ctx context.Context, queue, consumer 
string, autoAck bool) (<-chan amqp.Delivery, error) {
-       return rc.ch.ConsumeWithContext(ctx, queue, consumer, autoAck, false, 
false, false, nil)
-}
diff --git a/tools/go-agent/instrument/plugins/rewrite/context.go 
b/tools/go-agent/instrument/plugins/rewrite/context.go
index 47d2a4e..a5136ba 100644
--- a/tools/go-agent/instrument/plugins/rewrite/context.go
+++ b/tools/go-agent/instrument/plugins/rewrite/context.go
@@ -341,7 +341,7 @@ func (c *Context) enhanceTypeNameWhenRewrite(fieldType 
dst.Expr, parent dst.Node
 
 func (c *Context) typeIsBasicTypeValueOrEnhanceName(name string) bool {
        if strings.HasPrefix(name, OperatePrefix) || strings.HasPrefix(name, 
GenerateMethodPrefix) || tools.IsBasicDataType(name) ||
-               name == "nil" || name == "true" || name == "false" || name == 
"append" || name == "panic" || name == "new" {
+               name == "nil" || name == "true" || name == "false" || name == 
"append" || name == "panic" || name == "new" || name == "delete" {
                return true
        }
        if _, valErr := strconv.ParseFloat(name, 64); valErr == nil {

Reply via email to