This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-go.git
The following commit(s) were added to refs/heads/main by this push:
new 988702e [bugfix] AMQP consumers are not living up to expectations
(#210)
988702e is described below
commit 988702ef41b9d9075ff060b5bba54d97a94fab9c
Author: renyansongno1 <[email protected]>
AuthorDate: Wed Dec 18 14:12:56 2024 +0800
[bugfix] AMQP consumers are not living up to expectations (#210)
---
plugins/amqp/consumer.go | 29 +++++++-
plugins/amqp/consumer_with_ctx.go | 37 ---------
plugins/amqp/general_consumer.go | 87 ++++++++++++++++------
plugins/amqp/instrument.go | 33 ++++----
plugins/amqp/structures.go | 27 +++++++
test/plugins/scenarios/amqp/config/excepted.yml | 40 +++++-----
test/plugins/scenarios/amqp/main.go | 84 +++++++--------------
.../go-agent/instrument/plugins/rewrite/context.go | 2 +-
8 files changed, 184 insertions(+), 155 deletions(-)
diff --git a/plugins/amqp/consumer.go b/plugins/amqp/consumer.go
index 0bbb335..52c6e6e 100644
--- a/plugins/amqp/consumer.go
+++ b/plugins/amqp/consumer.go
@@ -23,15 +23,36 @@ import (
"github.com/apache/skywalking-go/plugins/core/operator"
)
+type ConsumersSendInterceptor struct {
+}
+
+func (c *ConsumersSendInterceptor) BeforeInvoke(invocation
operator.Invocation) error {
+ return nil
+}
+
+func (c *ConsumersSendInterceptor) AfterInvoke(invocation operator.Invocation,
results ...interface{}) error {
+ return GeneralConsumersSendAfterInvoke(invocation, results...)
+}
+
type ConsumerInterceptor struct {
}
func (c *ConsumerInterceptor) BeforeInvoke(invocation operator.Invocation)
error {
+ args := invocation.Args()[6].(amqp091.Table)
+ return GeneralConsumerBeforeInvoke(invocation, args)
+}
+
+func (c *ConsumerInterceptor) AfterInvoke(operator.Invocation, ...interface{})
error {
return nil
}
-func (c *ConsumerInterceptor) AfterInvoke(invocation operator.Invocation,
results ...interface{}) error {
- queue, consumerTag, args := invocation.Args()[0].(string),
invocation.Args()[1].(string),
- invocation.Args()[6].(amqp091.Table)
- return GeneralConsumerAfterInvoke(invocation, queue, consumerTag, args,
results...)
+type ConsumersCloseInterceptor struct {
+}
+
+func (c *ConsumersCloseInterceptor) BeforeInvoke(invocation
operator.Invocation) error {
+ return GeneralConsumerCloseBeforeInvoke(invocation)
+}
+
+func (c *ConsumersCloseInterceptor) AfterInvoke(operator.Invocation,
...interface{}) error {
+ return nil
}
diff --git a/plugins/amqp/consumer_with_ctx.go
b/plugins/amqp/consumer_with_ctx.go
deleted file mode 100644
index f7993f2..0000000
--- a/plugins/amqp/consumer_with_ctx.go
+++ /dev/null
@@ -1,37 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package amqp
-
-import (
- "github.com/rabbitmq/amqp091-go"
-
- "github.com/apache/skywalking-go/plugins/core/operator"
-)
-
-type ConsumerWithCtxInterceptor struct {
-}
-
-func (cwi *ConsumerWithCtxInterceptor) BeforeInvoke(invocation
operator.Invocation) error {
- return nil
-}
-
-func (cwi *ConsumerWithCtxInterceptor) AfterInvoke(invocation
operator.Invocation, results ...interface{}) error {
- queue, consumerTag, args := invocation.Args()[1].(string),
invocation.Args()[2].(string),
- invocation.Args()[7].(amqp091.Table)
- return GeneralConsumerAfterInvoke(invocation, queue, consumerTag, args,
results...)
-}
diff --git a/plugins/amqp/general_consumer.go b/plugins/amqp/general_consumer.go
index d4c9087..89c4cc8 100644
--- a/plugins/amqp/general_consumer.go
+++ b/plugins/amqp/general_consumer.go
@@ -19,6 +19,9 @@ package amqp
import (
"fmt"
+ "os"
+ "strconv"
+ "sync/atomic"
"github.com/rabbitmq/amqp091-go"
@@ -27,44 +30,82 @@ import (
)
const (
- ConsumerComponentID = 145
- amqpConsumerPrefix = "AMQP/"
- amqpConsumerSuffix = "/Consumer"
- tagMQConsumerTag = "mq.consumer_tag"
- tagMQReplyTo = "mq.reply_to"
- tagMQCorrelationID = "mq.correlation_id"
- tagMQArgs = "mq.args"
+ ConsumerComponentID = 145
+ amqpConsumerPrefix = "AMQP/"
+ amqpConsumerSuffix = "/Consumer"
+ tagMQConsumerTag = "mq.consumer_tag"
+ tagMQReplyTo = "mq.reply_to"
+ tagMQCorrelationID = "mq.correlation_id"
+ tagMQArgs = "mq.args"
+ consumerTagLengthMax = 0xFF
)
-func GeneralConsumerAfterInvoke(invocation operator.Invocation, queue,
consumerTag string, args amqp091.Table, results ...interface{}) error {
- deliveries := <-results[0].(<-chan Delivery)
- if consumerTag == "" {
- consumerTag = deliveries.ConsumerTag
- }
- operationName := amqpConsumerPrefix + queue + "/" + consumerTag +
amqpConsumerSuffix
+var consumerSeq uint64
+var queueConsumerTagMapping = make(map[string]string)
- channel := invocation.CallerInstance().(*nativeChannel)
+func GeneralConsumersSendAfterInvoke(invocation operator.Invocation, results
...interface{}) error {
+ if foundConsumer := results[0].(bool); !foundConsumer {
+ return nil
+ }
+ consumerTag, _ := invocation.Args()[0].(string)
+ delivery, _ := invocation.Args()[1].(*Delivery)
+ operationName := amqpConsumerPrefix +
queueConsumerTagMapping[consumerTag] + "/" + consumerTag + amqpConsumerSuffix
+ channel, _ := delivery.Acknowledger.(*nativeChannel)
peer := getPeerInfo(channel.connection)
span, err := tracing.CreateEntrySpan(operationName, func(headerKey
string) (string, error) {
- return deliveries.Headers[headerKey].(string), nil
+ header, _ := delivery.Headers[headerKey].(string)
+ return header, nil
}, tracing.WithLayer(tracing.SpanLayerMQ),
tracing.WithComponent(ConsumerComponentID),
tracing.WithTag(tracing.TagMQBroker, peer),
- tracing.WithTag(tracing.TagMQQueue, queue),
- tracing.WithTag(tracing.TagMQMsgID, deliveries.MessageId),
+ tracing.WithTag(tracing.TagMQQueue,
queueConsumerTagMapping[consumerTag]),
+ tracing.WithTag(tracing.TagMQMsgID, delivery.MessageId),
tracing.WithTag(tagMQConsumerTag, consumerTag),
- tracing.WithTag(tagMQCorrelationID, deliveries.CorrelationId),
- tracing.WithTag(tagMQReplyTo, deliveries.ReplyTo),
- tracing.WithTag(tagMQArgs, fmt.Sprintf("%v", args)),
+ tracing.WithTag(tagMQCorrelationID, delivery.CorrelationId),
+ tracing.WithTag(tagMQReplyTo, delivery.ReplyTo),
+ tracing.WithTag(tagMQArgs, fmt.Sprintf("%v", delivery.Headers)),
)
if err != nil {
return err
}
span.SetPeer(peer)
- if err, ok := results[1].(error); ok && err != nil {
- span.Error(err.Error())
- }
span.End()
return nil
}
+
+func GeneralConsumerBeforeInvoke(invocation operator.Invocation, args
amqp091.Table) error {
+ queue := invocation.Args()[0].(string)
+ consumerTag := invocation.Args()[1].(string)
+ if consumerTag == "" {
+ consumerTag = uniqueConsumerTag()
+ }
+ queueConsumerTagMapping[consumerTag] = queue
+ return nil
+}
+
+func GeneralConsumerCloseBeforeInvoke(invocation operator.Invocation) error {
+ consumers, _ := invocation.CallerInstance().(*nativeConsumers)
+ consumers.Lock()
+ defer consumers.Unlock()
+ for consumerTag := range consumers.chans {
+ delete(queueConsumerTagMapping, consumerTag)
+ }
+ return nil
+}
+
+func uniqueConsumerTag() string {
+ return commandNameBasedUniqueConsumerTag(os.Args[0])
+}
+
+func commandNameBasedUniqueConsumerTag(commandName string) string {
+ tagPrefix := "ctag-"
+ tagInfix := commandName
+ tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq,
1), 10)
+
+ if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax {
+ tagInfix = "streadway/amqp"
+ }
+
+ return tagPrefix + tagInfix + tagSuffix
+}
diff --git a/plugins/amqp/instrument.go b/plugins/amqp/instrument.go
index 3ad747d..fdb6478 100644
--- a/plugins/amqp/instrument.go
+++ b/plugins/amqp/instrument.go
@@ -65,30 +65,37 @@ func (i *Instrument) Points() []*instrument.Point {
{
PackagePath: "",
PackageName: "amqp091",
- At: instrument.NewMethodEnhance("*Channel", "Consume",
- instrument.WithArgsCount(7),
+ At: instrument.NewMethodEnhance("*consumers", "send",
+ instrument.WithArgsCount(2),
instrument.WithArgType(0, "string"),
- instrument.WithArgType(1, "string"),
- instrument.WithArgType(6, "Table"),
- instrument.WithResultCount(2),
- instrument.WithResultType(0, "<-chan Delivery"),
- instrument.WithResultType(1, "error"),
+ instrument.WithArgType(1, "*Delivery"),
+ instrument.WithResultCount(1),
+ instrument.WithResultType(0, "bool"),
),
- Interceptor: "ConsumerInterceptor",
+ Interceptor: "ConsumersSendInterceptor",
+ },
+ {
+ PackagePath: "",
+ PackageName: "amqp091",
+ At: instrument.NewMethodEnhance("*consumers", "close",
+ instrument.WithArgsCount(0),
+ instrument.WithResultCount(0),
+ ),
+ Interceptor: "ConsumersCloseInterceptor",
},
{
PackagePath: "",
PackageName: "amqp091",
- At: instrument.NewMethodEnhance("*Channel",
"ConsumeWithContext",
- instrument.WithArgsCount(8),
+ At: instrument.NewMethodEnhance("*Channel", "Consume",
+ instrument.WithArgsCount(7),
+ instrument.WithArgType(0, "string"),
instrument.WithArgType(1, "string"),
- instrument.WithArgType(2, "string"),
- instrument.WithArgType(7, "Table"),
+ instrument.WithArgType(6, "Table"),
instrument.WithResultCount(2),
instrument.WithResultType(0, "<-chan Delivery"),
instrument.WithResultType(1, "error"),
),
- Interceptor: "ConsumerWithCtxInterceptor",
+ Interceptor: "ConsumerInterceptor",
},
{
PackagePath: "",
diff --git a/plugins/amqp/structures.go b/plugins/amqp/structures.go
index 5675b6c..d25f181 100644
--- a/plugins/amqp/structures.go
+++ b/plugins/amqp/structures.go
@@ -19,6 +19,7 @@ package amqp
import (
"io"
+ "sync"
)
//skywalking:native github.com/rabbitmq/amqp091-go Channel
@@ -26,8 +27,19 @@ type nativeChannel struct {
connection *nativeConnection
}
+func (ch *nativeChannel) Ack(tag uint64, multiple bool) error {
+ return nil
+}
+func (ch *nativeChannel) Nack(tag uint64, multiple, requeue bool) error {
+ return nil
+}
+func (ch *nativeChannel) Reject(tag uint64, requeue bool) error {
+ return nil
+}
+
//skywalking:native github.com/rabbitmq/amqp091-go Delivery
type Delivery struct {
+ Acknowledger nativeAcknowledger
Headers Table
MessageId string //nolint
ConsumerTag string
@@ -44,3 +56,18 @@ type Table map[string]interface{}
type nativeConnection struct {
conn io.ReadWriteCloser
}
+
+//skywalking:native github.com/rabbitmq/amqp091-go Acknowledger
+type nativeAcknowledger interface {
+ Ack(tag uint64, multiple bool) error
+ Nack(tag uint64, multiple bool, requeue bool) error
+ Reject(tag uint64, requeue bool) error
+}
+
+//skywalking:native github.com/rabbitmq/amqp091-go consumers
+type nativeConsumers struct {
+ sync.Mutex
+ chans consumerBuffers
+}
+
+type consumerBuffers map[string]chan *Delivery
diff --git a/test/plugins/scenarios/amqp/config/excepted.yml
b/test/plugins/scenarios/amqp/config/excepted.yml
index 7f86061..a04a05e 100644
--- a/test/plugins/scenarios/amqp/config/excepted.yml
+++ b/test/plugins/scenarios/amqp/config/excepted.yml
@@ -35,12 +35,27 @@ segmentItems:
- { key: mq.broker, value: 'amqp-server:5672' }
- { key: mq.exchange, value: not null }
- { key: mq.routing_key, value: sw-queue-1 }
- - operationName: AMQP/sw-queue-1/sw-consumer-1/Consumer
+ - operationName: AMQP/sw-queue-1/Producer
parentSpanId: 0
spanId: 2
spanLayer: MQ
startTime: nq 0
endTime: nq 0
+ componentId: 144
+ isError: false
+ spanType: Exit
+ peer: amqp-server:5672
+ skipAnalysis: false
+ tags:
+ - { key: mq.broker, value: 'amqp-server:5672' }
+ - { key: mq.exchange, value: not null }
+ - { key: mq.routing_key, value: sw-queue-1 }
+ - operationName: AMQP/sw-queue-1/sw-consumer-1/Consumer
+ parentSpanId: 0
+ spanId: 3
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
componentId: 145
isError: false
spanType: Entry
@@ -59,22 +74,7 @@ segmentItems:
parentSpanId: 1, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: amqp,
traceId: not null }
- - operationName: AMQP/sw-queue-2/Producer
- parentSpanId: 0
- spanId: 3
- spanLayer: MQ
- startTime: nq 0
- endTime: nq 0
- componentId: 144
- isError: false
- spanType: Exit
- peer: amqp-server:5672
- skipAnalysis: false
- tags:
- - { key: mq.broker, value: 'amqp-server:5672' }
- - { key: mq.exchange, value: not null }
- - { key: mq.routing_key, value: sw-queue-2 }
- - operationName: AMQP/sw-queue-2/sw-consumer-2/Consumer
+ - operationName: AMQP/sw-queue-1/sw-consumer-1/Consumer
parentSpanId: 0
spanId: 4
spanLayer: MQ
@@ -87,15 +87,15 @@ segmentItems:
skipAnalysis: false
tags:
- { key: mq.broker, value: 'amqp-server:5672' }
- - { key: mq.queue, value: sw-queue-2 }
+ - { key: mq.queue, value: sw-queue-1 }
- { key: mq.msg.id, value: not null }
- - { key: mq.consumer_tag, value: sw-consumer-2 }
+ - { key: mq.consumer_tag, value: sw-consumer-1 }
- { key: mq.correlation_id, value: not null }
- { key: mq.reply_to, value: not null }
- { key: mq.args, value: not null }
refs:
- { parentEndpoint: 'GET:/execute', networkAddress:
'amqp-server:5672', refType: CrossProcess,
- parentSpanId: 3, parentTraceSegmentId: not null,
+ parentSpanId: 2, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: amqp,
traceId: not null }
- operationName: GET:/execute
diff --git a/test/plugins/scenarios/amqp/main.go
b/test/plugins/scenarios/amqp/main.go
index b840c0e..71e5584 100644
--- a/test/plugins/scenarios/amqp/main.go
+++ b/test/plugins/scenarios/amqp/main.go
@@ -32,14 +32,11 @@ import (
type testFunc func(RabbitClient) error
var (
- uri = "amqp://admin:123456@amqp-server:5672"
- queue1 = "sw-queue-1"
- queue2 = "sw-queue-2"
- body = "I love skywalking 3 thousand"
- consumerTag1 = "sw-consumer-1"
- consumerTag2 = "sw-consumer-2"
- consumerTrigger = make(chan struct{})
- consumerWithCtxTrigger = make(chan struct{})
+ uri = "amqp://admin:123456@amqp-server:5672"
+ queue = "sw-queue-1"
+ body = "I love skywalking 3 thousand"
+ consumerTag = "sw-consumer-1"
+ consumerTrigger = make(chan struct{})
)
func main() {
@@ -59,7 +56,6 @@ func main() {
fn testFunc
}{
{"testSimpleConsumer", testSimpleConsumer},
- {"testConsumerWithCtx", testConsumerWithCtx},
}
for _, test := range tests {
fmt.Printf("excute test case: %s\n", test.name)
@@ -80,32 +76,36 @@ func main() {
}
func testSimpleConsumer(client RabbitClient) error {
- producer(queue1, client)
+ producer(queue, client)
go consumer()
consumerTrigger <- struct{}{}
time.Sleep(time.Second)
return nil
}
-func testConsumerWithCtx(client RabbitClient) error {
- producer(queue2, client)
- go consumerWithContext()
- consumerWithCtxTrigger <- struct{}{}
- time.Sleep(time.Second)
- return nil
-}
-
func producer(queue string, client RabbitClient) {
- client.CreateQueue(queue, true, false)
- if err := client.Send(context.Background(), "", queue, amqp.Publishing{
+ _, err := client.CreateQueue(queue, true, false)
+ if err != nil {
+ fmt.Println("Failed to Create Queue, err: ", err)
+ }
+ if err = client.Send(context.Background(), "", queue, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Headers: amqp.Table{},
CorrelationId: "1",
- MessageId: "2",
+ MessageId: "1",
}); err != nil {
fmt.Println("Failed to Send msg, err: ", err)
}
+ if err = client.Send(context.Background(), "", queue, amqp.Publishing{
+ ContentType: "text/plain",
+ Body: []byte(body),
+ Headers: amqp.Table{},
+ CorrelationId: "2",
+ MessageId: "2",
+ }); err != nil {
+ fmt.Println("Failed to Send msg second time, err: ", err)
+ }
}
func consumer() {
@@ -118,16 +118,19 @@ func consumer() {
if err != nil {
fmt.Println("Failed to Channel Consume, err: ", err)
}
- msgs, err := consumeClient.Consume(queue1, consumerTag1, false)
+ msgs, err := consumeClient.Consume(queue, consumerTag, false)
if err != nil {
fmt.Println("Failed to Consume msg, err: ", err)
}
log.Printf("[Consumer] Waiting for messages.\n")
for d := range msgs {
log.Printf("Received a message: %s\n", string(d.Body))
- d.Ack(false)
+ err = d.Ack(false)
+ if err != nil {
+ fmt.Println("Failed to ACK msg, err: ", err)
+ }
}
- err = consumeClient.Cancel(consumerTag1)
+ err = consumeClient.Cancel(consumerTag)
if err != nil {
fmt.Println("Failed to Cancel Consume, err: ", err)
}
@@ -137,35 +140,6 @@ func consumer() {
}
}
-func consumerWithContext() {
- <-consumerWithCtxTrigger
- consumeConn, err := amqp.Dial(uri)
- if err != nil {
- fmt.Println("Failed to Dial ConsumerWithContext, err: ", err)
- }
- consumeClient, err := NewRabbitMQClient(consumeConn)
- if err != nil {
- fmt.Println("Failed to Channel ConsumerWithContext, err: ", err)
- }
- msgs, err := consumeClient.Consume(queue2, consumerTag2, false)
- if err != nil {
- fmt.Println("Failed to Consume msg, err: ", err)
- }
- log.Printf("[ConsumerWithContext] Waiting for messages.\n")
- for d := range msgs {
- log.Printf("Received a message: %s", string(d.Body))
- d.Ack(false)
- }
- err = consumeClient.Cancel(consumerTag2)
- if err != nil {
- fmt.Println("Failed to Cancel ConsumerWithContext, err: ", err)
- }
- err = consumeConn.Close()
- if err != nil {
- fmt.Println("Failed to Close ConsumerWithContext, err: ", err)
- }
-}
-
// RabbitClient is used to keep track of the RabbitMQ connection
type RabbitClient struct {
// The connection that is used
@@ -233,7 +207,3 @@ func (rc RabbitClient) Send(ctx context.Context, exchange,
routingKey string, op
func (rc RabbitClient) Consume(queue, consumer string, autoAck bool) (<-chan
amqp.Delivery, error) {
return rc.ch.Consume(queue, consumer, autoAck, false, false, false, nil)
}
-
-func (rc RabbitClient) ConsumeWithContext(ctx context.Context, queue, consumer
string, autoAck bool) (<-chan amqp.Delivery, error) {
- return rc.ch.ConsumeWithContext(ctx, queue, consumer, autoAck, false,
false, false, nil)
-}
diff --git a/tools/go-agent/instrument/plugins/rewrite/context.go
b/tools/go-agent/instrument/plugins/rewrite/context.go
index 47d2a4e..a5136ba 100644
--- a/tools/go-agent/instrument/plugins/rewrite/context.go
+++ b/tools/go-agent/instrument/plugins/rewrite/context.go
@@ -341,7 +341,7 @@ func (c *Context) enhanceTypeNameWhenRewrite(fieldType
dst.Expr, parent dst.Node
func (c *Context) typeIsBasicTypeValueOrEnhanceName(name string) bool {
if strings.HasPrefix(name, OperatePrefix) || strings.HasPrefix(name,
GenerateMethodPrefix) || tools.IsBasicDataType(name) ||
- name == "nil" || name == "true" || name == "false" || name ==
"append" || name == "panic" || name == "new" {
+ name == "nil" || name == "true" || name == "false" || name ==
"append" || name == "panic" || name == "new" || name == "delete" {
return true
}
if _, valErr := strconv.ParseFloat(name, 64); valErr == nil {