This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new 44e8c44 Add Rabbitmq Plugin (#53)
44e8c44 is described below
commit 44e8c443aaa3a3a88d43ddd6579bdea32ca60625
Author: huawei <[email protected]>
AuthorDate: Mon Jul 27 13:52:49 2020 +0800
Add Rabbitmq Plugin (#53)
---
README.md | 1 +
setup.py | 1 +
skywalking/__init__.py | 2 +
skywalking/plugins/sw_rabbitmq/__init__.py | 108 ++++++++++++++++++++++++++
tests/plugin/sw_rabbitmq/__init__.py | 16 ++++
tests/plugin/sw_rabbitmq/docker-compose.yml | 87 +++++++++++++++++++++
tests/plugin/sw_rabbitmq/expected.data.yml | 91 ++++++++++++++++++++++
tests/plugin/sw_rabbitmq/services/__init__.py | 16 ++++
tests/plugin/sw_rabbitmq/services/consumer.py | 51 ++++++++++++
tests/plugin/sw_rabbitmq/services/producer.py | 47 +++++++++++
tests/plugin/sw_rabbitmq/test_kafka.py | 43 ++++++++++
11 files changed, 463 insertions(+)
diff --git a/README.md b/README.md
index af16be6..0dcf7bc 100755
--- a/README.md
+++ b/README.md
@@ -81,6 +81,7 @@ Library | Plugin Name
| [redis-py](https://github.com/andymccurdy/redis-py/) | `sw_redis` |
| [kafka-python](https://kafka-python.readthedocs.io/en/master/) | `sw_kafka` |
| [tornado](https://www.tornadoweb.org/en/stable/) | `sw_tornado` |
+| [pika](https://pika.readthedocs.io/en/stable/) | `sw_rabbitmq` |
## API
diff --git a/setup.py b/setup.py
index 4f1d1d7..0ce8624 100644
--- a/setup.py
+++ b/setup.py
@@ -51,6 +51,7 @@ setup(
"redis",
"kafka-python",
"tornado",
+ "pika",
],
},
classifiers=[
diff --git a/skywalking/__init__.py b/skywalking/__init__.py
index 23fbb7e..67ac3ac 100644
--- a/skywalking/__init__.py
+++ b/skywalking/__init__.py
@@ -32,6 +32,8 @@ class Component(Enum):
Redis = 7
KafkaProducer = 40
KafkaConsumer = 41
+ RabbitmqProducer = 52
+ RabbitmqConsumer = 53
class Layer(Enum):
diff --git a/skywalking/plugins/sw_rabbitmq/__init__.py
b/skywalking/plugins/sw_rabbitmq/__init__.py
new file mode 100644
index 0000000..601f33d
--- /dev/null
+++ b/skywalking/plugins/sw_rabbitmq/__init__.py
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+
+from skywalking import Layer, Component
+from skywalking.trace import tags
+from skywalking.trace.carrier import Carrier
+from skywalking.trace.context import get_context
+from skywalking.trace.tags import Tag
+
+logger = logging.getLogger(__name__)
+
+
+def install():
+ # noinspection PyBroadException
+ try:
+ from pika.channel import Channel
+
+ _basic_publish = Channel.basic_publish
+ __on_deliver = Channel._on_deliver
+ Channel.basic_publish = _sw_basic_publish_func(_basic_publish)
+ Channel._on_deliver = _sw__on_deliver_func(__on_deliver)
+
+ except Exception:
+ logger.warning('failed to install plugin %s', __name__)
+
+
+def _sw_basic_publish_func(_basic_publish):
+ def _sw_basic_publish(this, exchange,
+ routing_key,
+ body,
+ properties=None,
+ mandatory=False):
+ peer = '%s:%s' % (this.connection.params.host,
this.connection.params.port)
+ context = get_context()
+ carrier = Carrier()
+ import pika
+ with context.new_exit_span(op="RabbitMQ/Topic/" + exchange + "/Queue/"
+ routing_key + "/Producer" or "/",
+ peer=peer, carrier=carrier) as span:
+ span.layer = Layer.MQ
+ span.component = Component.RabbitmqProducer
+ properties = pika.BasicProperties() if properties is None else
properties
+
+ if properties.headers is None:
+ headers = {}
+ for item in carrier:
+ headers[item.key] = item.val
+ properties.headers = headers
+ else:
+ for item in carrier:
+ properties.headers[item.key] = item.val
+
+ try:
+ res = _basic_publish(this, exchange,
+ routing_key,
+ body,
+ properties=properties,
+ mandatory=mandatory)
+ span.tag(Tag(key=tags.MqBroker, val=peer))
+ span.tag(Tag(key=tags.MqTopic, val=exchange))
+ span.tag(Tag(key=tags.MqQueue, val=routing_key))
+ except BaseException as e:
+ span.raised()
+ raise e
+ return res
+
+ return _sw_basic_publish
+
+
+def _sw__on_deliver_func(__on_deliver):
+ def _sw__on_deliver(this, method_frame, header_frame, body):
+ peer = '%s:%s' % (this.connection.params.host,
this.connection.params.port)
+ context = get_context()
+ exchange = method_frame.method.exchange
+ routing_key = method_frame.method.routing_key
+ carrier = Carrier()
+ for item in carrier:
+ if item.key in header_frame.properties.headers:
+ item.val = header_frame.properties.headers[item.key]
+
+ with context.new_entry_span(op="RabbitMQ/Topic/" + exchange +
"/Queue/" + routing_key
+ + "/Consumer" or "", carrier=carrier)
as span:
+ span.layer = Layer.MQ
+ span.component = Component.RabbitmqConsumer
+ try:
+ __on_deliver(this, method_frame, header_frame, body)
+ span.tag(Tag(key=tags.MqBroker, val=peer))
+ span.tag(Tag(key=tags.MqTopic, val=exchange))
+ span.tag(Tag(key=tags.MqQueue, val=routing_key))
+ except BaseException as e:
+ span.raised()
+ raise e
+
+ return _sw__on_deliver
diff --git a/tests/plugin/sw_rabbitmq/__init__.py
b/tests/plugin/sw_rabbitmq/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/sw_rabbitmq/docker-compose.yml
b/tests/plugin/sw_rabbitmq/docker-compose.yml
new file mode 100644
index 0000000..ecbecec
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/docker-compose.yml
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '2.1'
+
+services:
+ collector:
+ extends:
+ service: collector
+ file: ../docker/docker-compose.base.yml
+
+ rabbitmq-server:
+ image: rabbitmq:latest
+ hostname: rabbitmq-server
+ ports:
+ - 5672:5672
+ - 15672:15672
+ environment:
+ - RABBITMQ_DEFAULT_PASS=admin
+ - RABBITMQ_DEFAULT_USER=admin
+ - RABBITMQ_DEFAULT_VHOST=/
+ networks:
+ - beyond
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/5672"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ producer:
+ extends:
+ service: agent
+ file: ../docker/docker-compose.base.yml
+ ports:
+ - 9090:9090
+ volumes:
+ - ./services/producer.py:/app/producer.py
+ command: ['bash', '-c', 'pip install flask && pip install pika && python3
/app/producer.py']
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ collector:
+ condition: service_healthy
+ rabbitmq-server:
+ condition: service_healthy
+ consumer:
+ condition: service_healthy
+
+ consumer:
+ extends:
+ service: agent
+ file: ../docker/docker-compose.base.yml
+ ports:
+ - 9091:9091
+ volumes:
+ - ./services/consumer.py:/app/consumer.py
+ command: ['bash', '-c', 'pip install flask && pip install pika && python3
/app/consumer.py']
+ healthcheck:
+ test: ["CMD", "bash", "-c", "ps -ef | grep /app/consumer | grep -v grep"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ collector:
+ condition: service_healthy
+ rabbitmq-server:
+ condition: service_healthy
+
+networks:
+ beyond:
diff --git a/tests/plugin/sw_rabbitmq/expected.data.yml
b/tests/plugin/sw_rabbitmq/expected.data.yml
new file mode 100644
index 0000000..cc6211a
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/expected.data.yml
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+segmentItems:
+ - serviceName: producer
+ segmentSize: 1
+ segments:
+ - segmentId: not null
+ spans:
+ - operationName: RabbitMQ/Topic/test/Queue/test/Producer
+ operationId: 0
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: MQ
+ tags:
+ - key: mq.broker
+ value: 'rabbitmq-server:5672'
+ - key: mq.topic
+ value: test
+ - key: mq.queue
+ value: test
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 52
+ spanType: Exit
+ peer: rabbitmq-server:5672
+ skipAnalysis: false
+ - operationName: /users
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ tags:
+ - key: http.method
+ value: GET
+ - key: url
+ value: http://0.0.0.0:9090/users
+ - key: status.code
+ value: '200'
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 7001
+ spanType: Entry
+ peer: not null
+ skipAnalysis: false
+ - serviceName: consumer
+ segmentSize: 1
+ segments:
+ - segmentId: not null
+ spans:
+ - operationName: RabbitMQ/Topic/test/Queue/test/Consumer
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ tags:
+ - key: mq.broker
+ value: 'rabbitmq-server:5672'
+ - key: mq.topic
+ value: test
+ - key: mq.queue
+ value: test
+ refs:
+ - parentEndpoint: RabbitMQ/Topic/test/Queue/test/Producer
+ networkAddress: 'rabbitmq-server:5672'
+ refType: CrossProcess
+ parentSpanId: 1
+ parentTraceSegmentId: not null
+ parentServiceInstance: not null
+ parentService: producer
+ traceId: not null
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 53
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
diff --git a/tests/plugin/sw_rabbitmq/services/__init__.py
b/tests/plugin/sw_rabbitmq/services/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/services/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/sw_rabbitmq/services/consumer.py
b/tests/plugin/sw_rabbitmq/services/consumer.py
new file mode 100644
index 0000000..30784e3
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/services/consumer.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from skywalking import config, agent
+
+if __name__ == '__main__':
+ config.service_name = 'consumer'
+ config.logging_level = 'INFO'
+ agent.start()
+
+ import pika
+
+ parameters =
(pika.URLParameters("amqp://admin:admin@rabbitmq-server:5672/%2F"))
+
+ connection = pika.BlockingConnection(parameters)
+
+ channel = connection.channel()
+ channel.queue_declare("test")
+ channel.exchange_declare("test")
+ channel.queue_bind(exchange='test', queue="test", routing_key='test')
+ for method_frame, properties, body in channel.consume('test'):
+ # Display the message parts and acknowledge the message
+ print(method_frame, properties, body)
+ channel.basic_ack(method_frame.delivery_tag)
+
+ # Escape out of the loop after 10 messages
+ if method_frame.delivery_tag == 10:
+ break
+
+ try:
+ # Loop so we can communicate with RabbitMQ
+ connection.ioloop.start()
+ except KeyboardInterrupt:
+ # Gracefully close the connection
+ connection.close()
+ # Loop until we're fully closed, will stop on its own
+ connection.ioloop.start()
diff --git a/tests/plugin/sw_rabbitmq/services/producer.py
b/tests/plugin/sw_rabbitmq/services/producer.py
new file mode 100644
index 0000000..5031d25
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/services/producer.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from skywalking import agent, config
+
+if __name__ == '__main__':
+ config.service_name = 'producer'
+ config.logging_level = 'INFO'
+ agent.start()
+
+ from flask import Flask, jsonify
+ app = Flask(__name__)
+ import pika
+ parameters =
(pika.URLParameters("amqp://admin:admin@rabbitmq-server:5672/%2F"))
+
+ @app.route("/users", methods=["POST", "GET"])
+ def application():
+ connection = pika.BlockingConnection(parameters)
+ channel = connection.channel()
+ channel.queue_declare("test")
+ channel.exchange_declare("test")
+ channel.queue_bind(exchange='test', queue="test", routing_key='test')
+ channel.basic_publish(exchange='test', routing_key='test',
properties=pika.BasicProperties(
+ headers={'key': 'value'}
+ ),
+ body=b'Test message.')
+ connection.close()
+
+ return jsonify({"song": "Despacito", "artist": "Luis Fonsi"})
+
+ PORT = 9090
+ app.run(host='0.0.0.0', port=PORT, debug=True)
diff --git a/tests/plugin/sw_rabbitmq/test_kafka.py
b/tests/plugin/sw_rabbitmq/test_kafka.py
new file mode 100644
index 0000000..f24add7
--- /dev/null
+++ b/tests/plugin/sw_rabbitmq/test_kafka.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import time
+import unittest
+from os.path import abspath, dirname
+
+from testcontainers.compose import DockerCompose
+
+from tests.plugin import BasePluginTest
+
+
+class TestPlugin(BasePluginTest):
+ @classmethod
+ def setUpClass(cls):
+ cls.compose = DockerCompose(filepath=dirname(abspath(__file__)))
+ cls.compose.start()
+
+ cls.compose.wait_for(cls.url(('producer', '9090'), 'users'))
+
+ def test_request_plugin(self):
+ time.sleep(3)
+
+
self.validate(expected_file_name=os.path.join(dirname(abspath(__file__)),
'expected.data.yml'))
+
+
+if __name__ == '__main__':
+ unittest.main()