This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 44e8c44  Add Rabbitmq Plugin (#53)
44e8c44 is described below

commit 44e8c443aaa3a3a88d43ddd6579bdea32ca60625
Author: huawei <alonela...@gmail.com>
AuthorDate: Mon Jul 27 13:52:49 2020 +0800

    Add Rabbitmq Plugin (#53)
---
 README.md                                     |   1 +
 setup.py                                      |   1 +
 skywalking/__init__.py                        |   2 +
 skywalking/plugins/sw_rabbitmq/__init__.py    | 108 ++++++++++++++++++++++++++
 tests/plugin/sw_rabbitmq/__init__.py          |  16 ++++
 tests/plugin/sw_rabbitmq/docker-compose.yml   |  87 +++++++++++++++++++++
 tests/plugin/sw_rabbitmq/expected.data.yml    |  91 ++++++++++++++++++++++
 tests/plugin/sw_rabbitmq/services/__init__.py |  16 ++++
 tests/plugin/sw_rabbitmq/services/consumer.py |  51 ++++++++++++
 tests/plugin/sw_rabbitmq/services/producer.py |  47 +++++++++++
 tests/plugin/sw_rabbitmq/test_kafka.py        |  43 ++++++++++
 11 files changed, 463 insertions(+)

diff --git a/README.md b/README.md
index af16be6..0dcf7bc 100755
--- a/README.md
+++ b/README.md
@@ -81,6 +81,7 @@ Library | Plugin Name
 | [redis-py](https://github.com/andymccurdy/redis-py/) | `sw_redis` |
 | [kafka-python](https://kafka-python.readthedocs.io/en/master/) | `sw_kafka` |
 | [tornado](https://www.tornadoweb.org/en/stable/) | `sw_tornado` |
+| [pika](https://pika.readthedocs.io/en/stable/) | `sw_rabbitmq` |
 
 ## API
 
diff --git a/setup.py b/setup.py
index 4f1d1d7..0ce8624 100644
--- a/setup.py
+++ b/setup.py
@@ -51,6 +51,7 @@ setup(
             "redis",
             "kafka-python",
             "tornado",
+            "pika",
         ],
     },
     classifiers=[
diff --git a/skywalking/__init__.py b/skywalking/__init__.py
index 23fbb7e..67ac3ac 100644
--- a/skywalking/__init__.py
+++ b/skywalking/__init__.py
@@ -32,6 +32,8 @@ class Component(Enum):
     Redis = 7
     KafkaProducer = 40
     KafkaConsumer = 41
+    RabbitmqProducer = 52
+    RabbitmqConsumer = 53
 
 
 class Layer(Enum):
diff --git a/skywalking/plugins/sw_rabbitmq/__init__.py 
b/skywalking/plugins/sw_rabbitmq/__init__.py
new file mode 100644
index 0000000..601f33d
--- /dev/null
+++ b/skywalking/plugins/sw_rabbitmq/__init__.py
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+
+from skywalking import Layer, Component
+from skywalking.trace import tags
+from skywalking.trace.carrier import Carrier
+from skywalking.trace.context import get_context
+from skywalking.trace.tags import Tag
+
+logger = logging.getLogger(__name__)
+
+
+def install():
+    # noinspection PyBroadException
+    try:
+        from pika.channel import Channel
+
+        _basic_publish = Channel.basic_publish
+        __on_deliver = Channel._on_deliver
+        Channel.basic_publish = _sw_basic_publish_func(_basic_publish)
+        Channel._on_deliver = _sw__on_deliver_func(__on_deliver)
+
+    except Exception:
+        logger.warning('failed to install plugin %s', __name__)
+
+
+def _sw_basic_publish_func(_basic_publish):
+    def _sw_basic_publish(this, exchange,
+                          routing_key,
+                          body,
+                          properties=None,
+                          mandatory=False):
+        peer = '%s:%s' % (this.connection.params.host, 
this.connection.params.port)
+        context = get_context()
+        carrier = Carrier()
+        import pika
+        with context.new_exit_span(op="RabbitMQ/Topic/" + exchange + "/Queue/" 
+ routing_key + "/Producer" or "/",
+                                   peer=peer, carrier=carrier) as span:
+            span.layer = Layer.MQ
+            span.component = Component.RabbitmqProducer
+            properties = pika.BasicProperties() if properties is None else 
properties
+
+            if properties.headers is None:
+                headers = {}
+                for item in carrier:
+                    headers[item.key] = item.val
+                properties.headers = headers
+            else:
+                for item in carrier:
+                    properties.headers[item.key] = item.val
+
+            try:
+                res = _basic_publish(this, exchange,
+                                     routing_key,
+                                     body,
+                                     properties=properties,
+                                     mandatory=mandatory)
+                span.tag(Tag(key=tags.MqBroker, val=peer))
+                span.tag(Tag(key=tags.MqTopic, val=exchange))
+                span.tag(Tag(key=tags.MqQueue, val=routing_key))
+            except BaseException as e:
+                span.raised()
+                raise e
+            return res
+
+    return _sw_basic_publish
+
+
+def _sw__on_deliver_func(__on_deliver):
+    def _sw__on_deliver(this, method_frame, header_frame, body):
+        peer = '%s:%s' % (this.connection.params.host, 
this.connection.params.port)
+        context = get_context()
+        exchange = method_frame.method.exchange
+        routing_key = method_frame.method.routing_key
+        carrier = Carrier()
+        for item in carrier:
+            if item.key in header_frame.properties.headers:
+                item.val = header_frame.properties.headers[item.key]
+
+        with context.new_entry_span(op="RabbitMQ/Topic/" + exchange + 
"/Queue/" + routing_key
+                                       + "/Consumer" or "", carrier=carrier) 
as span:
+            span.layer = Layer.MQ
+            span.component = Component.RabbitmqConsumer
+            try:
+                __on_deliver(this, method_frame, header_frame, body)
+                span.tag(Tag(key=tags.MqBroker, val=peer))
+                span.tag(Tag(key=tags.MqTopic, val=exchange))
+                span.tag(Tag(key=tags.MqQueue, val=routing_key))
+            except BaseException as e:
+                span.raised()
+                raise e
+
+    return _sw__on_deliver
diff --git a/tests/plugin/sw_rabbitmq/__init__.py 
b/tests/plugin/sw_rabbitmq/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/sw_rabbitmq/docker-compose.yml 
b/tests/plugin/sw_rabbitmq/docker-compose.yml
new file mode 100644
index 0000000..ecbecec
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/docker-compose.yml
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '2.1'
+
+services:
+  collector:
+    extends:
+      service: collector
+      file: ../docker/docker-compose.base.yml
+
+  rabbitmq-server:
+    image: rabbitmq:latest
+    hostname: rabbitmq-server
+    ports:
+      - 5672:5672
+      - 15672:15672
+    environment:
+      - RABBITMQ_DEFAULT_PASS=admin
+      - RABBITMQ_DEFAULT_USER=admin
+      - RABBITMQ_DEFAULT_VHOST=/
+    networks:
+      - beyond
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/5672"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+
+  producer:
+    extends:
+      service: agent
+      file: ../docker/docker-compose.base.yml
+    ports:
+      - 9090:9090
+    volumes:
+      - ./services/producer.py:/app/producer.py
+    command: ['bash', '-c', 'pip install flask && pip install pika && python3 
/app/producer.py']
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    depends_on:
+      collector:
+        condition: service_healthy
+      rabbitmq-server:
+        condition: service_healthy
+      consumer:
+        condition: service_healthy
+
+  consumer:
+    extends:
+      service: agent
+      file: ../docker/docker-compose.base.yml
+    ports:
+      - 9091:9091
+    volumes:
+      - ./services/consumer.py:/app/consumer.py
+    command: ['bash', '-c', 'pip install flask && pip install pika && python3 
/app/consumer.py']
+    healthcheck:
+      test: ["CMD", "bash", "-c", "ps -ef | grep /app/consumer | grep -v grep"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    depends_on:
+      collector:
+        condition: service_healthy
+      rabbitmq-server:
+        condition: service_healthy
+
+networks:
+  beyond:
diff --git a/tests/plugin/sw_rabbitmq/expected.data.yml 
b/tests/plugin/sw_rabbitmq/expected.data.yml
new file mode 100644
index 0000000..cc6211a
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/expected.data.yml
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+segmentItems:
+  - serviceName: producer
+    segmentSize: 1
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: RabbitMQ/Topic/test/Queue/test/Producer
+            operationId: 0
+            parentSpanId: 0
+            spanId: 1
+            spanLayer: MQ
+            tags:
+              - key: mq.broker
+                value: 'rabbitmq-server:5672'
+              - key: mq.topic
+                value: test
+              - key: mq.queue
+                value: test
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 52
+            spanType: Exit
+            peer: rabbitmq-server:5672
+            skipAnalysis: false
+          - operationName: /users
+            operationId: 0
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            tags:
+              - key: http.method
+                value: GET
+              - key: url
+                value: http://0.0.0.0:9090/users
+              - key: status.code
+                value: '200'
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 7001
+            spanType: Entry
+            peer: not null
+            skipAnalysis: false
+  - serviceName: consumer
+    segmentSize: 1
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: RabbitMQ/Topic/test/Queue/test/Consumer
+            operationId: 0
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: MQ
+            tags:
+              - key: mq.broker
+                value: 'rabbitmq-server:5672'
+              - key: mq.topic
+                value: test
+              - key: mq.queue
+                value: test
+            refs:
+              - parentEndpoint: RabbitMQ/Topic/test/Queue/test/Producer
+                networkAddress: 'rabbitmq-server:5672'
+                refType: CrossProcess
+                parentSpanId: 1
+                parentTraceSegmentId: not null
+                parentServiceInstance: not null
+                parentService: producer
+                traceId: not null
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 53
+            spanType: Entry
+            peer: ''
+            skipAnalysis: false
diff --git a/tests/plugin/sw_rabbitmq/services/__init__.py 
b/tests/plugin/sw_rabbitmq/services/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/services/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/sw_rabbitmq/services/consumer.py 
b/tests/plugin/sw_rabbitmq/services/consumer.py
new file mode 100644
index 0000000..30784e3
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/services/consumer.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from skywalking import config, agent
+
+if __name__ == '__main__':
+    config.service_name = 'consumer'
+    config.logging_level = 'INFO'
+    agent.start()
+
+    import pika
+
+    parameters = 
(pika.URLParameters("amqp://admin:admin@rabbitmq-server:5672/%2F"))
+
+    connection = pika.BlockingConnection(parameters)
+
+    channel = connection.channel()
+    channel.queue_declare("test")
+    channel.exchange_declare("test")
+    channel.queue_bind(exchange='test', queue="test", routing_key='test')
+    for method_frame, properties, body in channel.consume('test'):
+        # Display the message parts and acknowledge the message
+        print(method_frame, properties, body)
+        channel.basic_ack(method_frame.delivery_tag)
+
+        # Escape out of the loop after 10 messages
+        if method_frame.delivery_tag == 10:
+            break
+
+    try:
+        # Loop so we can communicate with RabbitMQ
+        connection.ioloop.start()
+    except KeyboardInterrupt:
+        # Gracefully close the connection
+        connection.close()
+        # Loop until we're fully closed, will stop on its own
+        connection.ioloop.start()
diff --git a/tests/plugin/sw_rabbitmq/services/producer.py 
b/tests/plugin/sw_rabbitmq/services/producer.py
new file mode 100644
index 0000000..5031d25
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/services/producer.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from skywalking import agent, config
+
+if __name__ == '__main__':
+    config.service_name = 'producer'
+    config.logging_level = 'INFO'
+    agent.start()
+
+    from flask import Flask, jsonify
+    app = Flask(__name__)
+    import pika
+    parameters = 
(pika.URLParameters("amqp://admin:admin@rabbitmq-server:5672/%2F"))
+
+    @app.route("/users", methods=["POST", "GET"])
+    def application():
+        connection = pika.BlockingConnection(parameters)
+        channel = connection.channel()
+        channel.queue_declare("test")
+        channel.exchange_declare("test")
+        channel.queue_bind(exchange='test', queue="test", routing_key='test')
+        channel.basic_publish(exchange='test', routing_key='test',  
properties=pika.BasicProperties(
+            headers={'key': 'value'}
+        ),
+                              body=b'Test message.')
+        connection.close()
+
+        return jsonify({"song": "Despacito", "artist": "Luis Fonsi"})
+
+    PORT = 9090
+    app.run(host='0.0.0.0', port=PORT, debug=True)
diff --git a/tests/plugin/sw_rabbitmq/test_kafka.py 
b/tests/plugin/sw_rabbitmq/test_kafka.py
new file mode 100644
index 0000000..f24add7
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/test_kafka.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import time
+import unittest
+from os.path import abspath, dirname
+
+from testcontainers.compose import DockerCompose
+
+from tests.plugin import BasePluginTest
+
+
+class TestPlugin(BasePluginTest):
+    @classmethod
+    def setUpClass(cls):
+        cls.compose = DockerCompose(filepath=dirname(abspath(__file__)))
+        cls.compose.start()
+
+        cls.compose.wait_for(cls.url(('producer', '9090'), 'users'))
+
+    def test_request_plugin(self):
+        time.sleep(3)
+
+        
self.validate(expected_file_name=os.path.join(dirname(abspath(__file__)), 
'expected.data.yml'))
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to