This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push: new 44e8c44 Add Rabbitmq Plugin (#53) 44e8c44 is described below commit 44e8c443aaa3a3a88d43ddd6579bdea32ca60625 Author: huawei <alonela...@gmail.com> AuthorDate: Mon Jul 27 13:52:49 2020 +0800 Add Rabbitmq Plugin (#53) --- README.md | 1 + setup.py | 1 + skywalking/__init__.py | 2 + skywalking/plugins/sw_rabbitmq/__init__.py | 108 ++++++++++++++++++++++++++ tests/plugin/sw_rabbitmq/__init__.py | 16 ++++ tests/plugin/sw_rabbitmq/docker-compose.yml | 87 +++++++++++++++++++++ tests/plugin/sw_rabbitmq/expected.data.yml | 91 ++++++++++++++++++++++ tests/plugin/sw_rabbitmq/services/__init__.py | 16 ++++ tests/plugin/sw_rabbitmq/services/consumer.py | 51 ++++++++++++ tests/plugin/sw_rabbitmq/services/producer.py | 47 +++++++++++ tests/plugin/sw_rabbitmq/test_kafka.py | 43 ++++++++++ 11 files changed, 463 insertions(+) diff --git a/README.md b/README.md index af16be6..0dcf7bc 100755 --- a/README.md +++ b/README.md @@ -81,6 +81,7 @@ Library | Plugin Name | [redis-py](https://github.com/andymccurdy/redis-py/) | `sw_redis` | | [kafka-python](https://kafka-python.readthedocs.io/en/master/) | `sw_kafka` | | [tornado](https://www.tornadoweb.org/en/stable/) | `sw_tornado` | +| [pika](https://pika.readthedocs.io/en/stable/) | `sw_rabbitmq` | ## API diff --git a/setup.py b/setup.py index 4f1d1d7..0ce8624 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ setup( "redis", "kafka-python", "tornado", + "pika", ], }, classifiers=[ diff --git a/skywalking/__init__.py b/skywalking/__init__.py index 23fbb7e..67ac3ac 100644 --- a/skywalking/__init__.py +++ b/skywalking/__init__.py @@ -32,6 +32,8 @@ class Component(Enum): Redis = 7 KafkaProducer = 40 KafkaConsumer = 41 + RabbitmqProducer = 52 + RabbitmqConsumer = 53 class Layer(Enum): diff --git a/skywalking/plugins/sw_rabbitmq/__init__.py b/skywalking/plugins/sw_rabbitmq/__init__.py new file mode 100644 index 0000000..601f33d --- /dev/null +++ b/skywalking/plugins/sw_rabbitmq/__init__.py @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging + +from skywalking import Layer, Component +from skywalking.trace import tags +from skywalking.trace.carrier import Carrier +from skywalking.trace.context import get_context +from skywalking.trace.tags import Tag + +logger = logging.getLogger(__name__) + + +def install(): + # noinspection PyBroadException + try: + from pika.channel import Channel + + _basic_publish = Channel.basic_publish + __on_deliver = Channel._on_deliver + Channel.basic_publish = _sw_basic_publish_func(_basic_publish) + Channel._on_deliver = _sw__on_deliver_func(__on_deliver) + + except Exception: + logger.warning('failed to install plugin %s', __name__) + + +def _sw_basic_publish_func(_basic_publish): + def _sw_basic_publish(this, exchange, + routing_key, + body, + properties=None, + mandatory=False): + peer = '%s:%s' % (this.connection.params.host, this.connection.params.port) + context = get_context() + carrier = Carrier() + import pika + with context.new_exit_span(op="RabbitMQ/Topic/" + exchange + "/Queue/" + routing_key + "/Producer" or "/", + peer=peer, carrier=carrier) as span: + span.layer = Layer.MQ + span.component = Component.RabbitmqProducer + properties = pika.BasicProperties() if properties is None else properties + + if properties.headers is None: + headers = {} + for item in carrier: + headers[item.key] = item.val + properties.headers = headers + else: + for item in carrier: + properties.headers[item.key] = item.val + + try: + res = _basic_publish(this, exchange, + routing_key, + body, + properties=properties, + mandatory=mandatory) + span.tag(Tag(key=tags.MqBroker, val=peer)) + span.tag(Tag(key=tags.MqTopic, val=exchange)) + span.tag(Tag(key=tags.MqQueue, val=routing_key)) + except BaseException as e: + span.raised() + raise e + return res + + return _sw_basic_publish + + +def _sw__on_deliver_func(__on_deliver): + def _sw__on_deliver(this, method_frame, header_frame, body): + peer = '%s:%s' % (this.connection.params.host, this.connection.params.port) + context = get_context() + exchange = method_frame.method.exchange + routing_key = method_frame.method.routing_key + carrier = Carrier() + for item in carrier: + if item.key in header_frame.properties.headers: + item.val = header_frame.properties.headers[item.key] + + with context.new_entry_span(op="RabbitMQ/Topic/" + exchange + "/Queue/" + routing_key + + "/Consumer" or "", carrier=carrier) as span: + span.layer = Layer.MQ + span.component = Component.RabbitmqConsumer + try: + __on_deliver(this, method_frame, header_frame, body) + span.tag(Tag(key=tags.MqBroker, val=peer)) + span.tag(Tag(key=tags.MqTopic, val=exchange)) + span.tag(Tag(key=tags.MqQueue, val=routing_key)) + except BaseException as e: + span.raised() + raise e + + return _sw__on_deliver diff --git a/tests/plugin/sw_rabbitmq/__init__.py b/tests/plugin/sw_rabbitmq/__init__.py new file mode 100644 index 0000000..b1312a0 --- /dev/null +++ b/tests/plugin/sw_rabbitmq/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/sw_rabbitmq/docker-compose.yml b/tests/plugin/sw_rabbitmq/docker-compose.yml new file mode 100644 index 0000000..ecbecec --- /dev/null +++ b/tests/plugin/sw_rabbitmq/docker-compose.yml @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2.1' + +services: + collector: + extends: + service: collector + file: ../docker/docker-compose.base.yml + + rabbitmq-server: + image: rabbitmq:latest + hostname: rabbitmq-server + ports: + - 5672:5672 + - 15672:15672 + environment: + - RABBITMQ_DEFAULT_PASS=admin + - RABBITMQ_DEFAULT_USER=admin + - RABBITMQ_DEFAULT_VHOST=/ + networks: + - beyond + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/5672"] + interval: 5s + timeout: 60s + retries: 120 + + producer: + extends: + service: agent + file: ../docker/docker-compose.base.yml + ports: + - 9090:9090 + volumes: + - ./services/producer.py:/app/producer.py + command: ['bash', '-c', 'pip install flask && pip install pika && python3 /app/producer.py'] + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + collector: + condition: service_healthy + rabbitmq-server: + condition: service_healthy + consumer: + condition: service_healthy + + consumer: + extends: + service: agent + file: ../docker/docker-compose.base.yml + ports: + - 9091:9091 + volumes: + - ./services/consumer.py:/app/consumer.py + command: ['bash', '-c', 'pip install flask && pip install pika && python3 /app/consumer.py'] + healthcheck: + test: ["CMD", "bash", "-c", "ps -ef | grep /app/consumer | grep -v grep"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + collector: + condition: service_healthy + rabbitmq-server: + condition: service_healthy + +networks: + beyond: diff --git a/tests/plugin/sw_rabbitmq/expected.data.yml b/tests/plugin/sw_rabbitmq/expected.data.yml new file mode 100644 index 0000000..cc6211a --- /dev/null +++ b/tests/plugin/sw_rabbitmq/expected.data.yml @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +segmentItems: + - serviceName: producer + segmentSize: 1 + segments: + - segmentId: not null + spans: + - operationName: RabbitMQ/Topic/test/Queue/test/Producer + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + tags: + - key: mq.broker + value: 'rabbitmq-server:5672' + - key: mq.topic + value: test + - key: mq.queue + value: test + startTime: gt 0 + endTime: gt 0 + componentId: 52 + spanType: Exit + peer: rabbitmq-server:5672 + skipAnalysis: false + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: GET + - key: url + value: http://0.0.0.0:9090/users + - key: status.code + value: '200' + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false + - serviceName: consumer + segmentSize: 1 + segments: + - segmentId: not null + spans: + - operationName: RabbitMQ/Topic/test/Queue/test/Consumer + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + tags: + - key: mq.broker + value: 'rabbitmq-server:5672' + - key: mq.topic + value: test + - key: mq.queue + value: test + refs: + - parentEndpoint: RabbitMQ/Topic/test/Queue/test/Producer + networkAddress: 'rabbitmq-server:5672' + refType: CrossProcess + parentSpanId: 1 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: producer + traceId: not null + startTime: gt 0 + endTime: gt 0 + componentId: 53 + spanType: Entry + peer: '' + skipAnalysis: false diff --git a/tests/plugin/sw_rabbitmq/services/__init__.py b/tests/plugin/sw_rabbitmq/services/__init__.py new file mode 100644 index 0000000..b1312a0 --- /dev/null +++ b/tests/plugin/sw_rabbitmq/services/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/sw_rabbitmq/services/consumer.py b/tests/plugin/sw_rabbitmq/services/consumer.py new file mode 100644 index 0000000..30784e3 --- /dev/null +++ b/tests/plugin/sw_rabbitmq/services/consumer.py @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from skywalking import config, agent + +if __name__ == '__main__': + config.service_name = 'consumer' + config.logging_level = 'INFO' + agent.start() + + import pika + + parameters = (pika.URLParameters("amqp://admin:admin@rabbitmq-server:5672/%2F")) + + connection = pika.BlockingConnection(parameters) + + channel = connection.channel() + channel.queue_declare("test") + channel.exchange_declare("test") + channel.queue_bind(exchange='test', queue="test", routing_key='test') + for method_frame, properties, body in channel.consume('test'): + # Display the message parts and acknowledge the message + print(method_frame, properties, body) + channel.basic_ack(method_frame.delivery_tag) + + # Escape out of the loop after 10 messages + if method_frame.delivery_tag == 10: + break + + try: + # Loop so we can communicate with RabbitMQ + connection.ioloop.start() + except KeyboardInterrupt: + # Gracefully close the connection + connection.close() + # Loop until we're fully closed, will stop on its own + connection.ioloop.start() diff --git a/tests/plugin/sw_rabbitmq/services/producer.py b/tests/plugin/sw_rabbitmq/services/producer.py new file mode 100644 index 0000000..5031d25 --- /dev/null +++ b/tests/plugin/sw_rabbitmq/services/producer.py @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +from skywalking import agent, config + +if __name__ == '__main__': + config.service_name = 'producer' + config.logging_level = 'INFO' + agent.start() + + from flask import Flask, jsonify + app = Flask(__name__) + import pika + parameters = (pika.URLParameters("amqp://admin:admin@rabbitmq-server:5672/%2F")) + + @app.route("/users", methods=["POST", "GET"]) + def application(): + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare("test") + channel.exchange_declare("test") + channel.queue_bind(exchange='test', queue="test", routing_key='test') + channel.basic_publish(exchange='test', routing_key='test', properties=pika.BasicProperties( + headers={'key': 'value'} + ), + body=b'Test message.') + connection.close() + + return jsonify({"song": "Despacito", "artist": "Luis Fonsi"}) + + PORT = 9090 + app.run(host='0.0.0.0', port=PORT, debug=True) diff --git a/tests/plugin/sw_rabbitmq/test_kafka.py b/tests/plugin/sw_rabbitmq/test_kafka.py new file mode 100644 index 0000000..f24add7 --- /dev/null +++ b/tests/plugin/sw_rabbitmq/test_kafka.py @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import time +import unittest +from os.path import abspath, dirname + +from testcontainers.compose import DockerCompose + +from tests.plugin import BasePluginTest + + +class TestPlugin(BasePluginTest): + @classmethod + def setUpClass(cls): + cls.compose = DockerCompose(filepath=dirname(abspath(__file__))) + cls.compose.start() + + cls.compose.wait_for(cls.url(('producer', '9090'), 'users')) + + def test_request_plugin(self): + time.sleep(3) + + self.validate(expected_file_name=os.path.join(dirname(abspath(__file__)), 'expected.data.yml')) + + +if __name__ == '__main__': + unittest.main()