renyansongno1 opened a new issue, #12868:
URL: https://github.com/apache/skywalking/issues/12868

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/skywalking/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Apache SkyWalking Component
   
   Go Agent (apache/skywalking-go)
   
   ### What happened
   
   ### Background
   
   Recently, when using the agent of skywalking-go, we found that the consumer 
side of AMQP would block the goroutine. Through pprof's goroutines, it was 
discovered that the blockage occurred at the first line of the following 
function:
   
   ```go
   func GeneralConsumerAfterInvoke(invocation operator.Invocation, queue, 
consumerTag string, args amqp091.Table, results ...interface{}) error {
        deliveries := <-results[0].(<-chan Delivery)
        //....
   }
   ```
   
   Then, through the interception point defined by the plugin, we found that 
there are certain issues with the current enhancement logic. For example, the 
normal consumption code snippet is extracted as follows:
   
   ```go
   deliveries, err := c.channel.Consume(
                queue.Name, // name
                "",         // consumerTag,
                *autoAck,   // autoAck
                false,      // exclusive
                false,      // noLocal
                false,      // noWait
                nil,        // arguments
        )
        
        go handler(deliveries, err)
   
   // goroutine consumer logic
   func handle(deliveries <-chan amqp.Delivery, done chan error) {
        cleanup := func() {
                Log.Printf("handle: deliveries channel closed")
                done <- nil
        }
   
        defer cleanup()
   
        for d := range deliveries {
                deliveryCount++
                if *verbose == true {
                        Log.Printf(
                                "got %dB delivery: [%v] %q",
                                len(d.Body),
                                d.DeliveryTag,
                                d.Body,
                        )
                } else {
                        if deliveryCount%65536 == 0 {
                                Log.Printf("delivery count %d", deliveryCount)
                        }
                }
                if *autoAck == false {
                        d.Ack(false)
                }
        }
   }
   ```
   
   Therefore, the normal consumer needs to read the read-only chan returned by 
the consumer function. Any reading will cause data loss, and when there is no 
message coming, the current goroutine will also block here.
   
   Additionally, this Consumer function is only called once, so the trace 
reporting of the messages only happens once, not for every message as a 
separate span.
   
   ### What you expected to happen
   
   In expectation, each message should correspond to a single span, and the 
agent should not affect any normal business logic.
   
   
   ### How to reproduce
   
   
[ampq-skywalking.zip](https://github.com/user-attachments/files/18150228/ampq-skywalking.zip)
   
   I provided a demo, which is an official example of AMQP091. Normally, using 
go build -toolexec will result in an enhanced binary file. If any message is 
sent to the broker and queue in the example, the issue can be reproduced (using 
the RabbitMQ management console or a producer to send messages).
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit a pull request to fix on your own?
   
   - [X] Yes I am willing to submit a pull request on my own!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 
[email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to