This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new 02dc53c Add Pulsar plugin (#345)
02dc53c is described below
commit 02dc53ce2de37c8ac3396bbf298004eeb4f42942
Author: Starry <[email protected]>
AuthorDate: Sat Jun 29 20:32:54 2024 +0800
Add Pulsar plugin (#345)
---
.gitmodules | 2 +-
CHANGELOG.md | 1 +
Makefile | 4 +-
docs/en/setup/Plugins.md | 1 +
docs/en/setup/advanced/LogReporter.md | 2 +-
poetry.lock | 49 ++++++++++-
protocol | 2 +-
pyproject.toml | 1 +
skywalking/__init__.py | 2 +
skywalking/plugins/sw_pulsar.py | 107 +++++++++++++++++++++++
tests/plugin/data/sw_pulsar/__init__.py | 16 ++++
tests/plugin/data/sw_pulsar/docker-compose.yml | 90 +++++++++++++++++++
tests/plugin/data/sw_pulsar/expected.data.yml | 86 ++++++++++++++++++
tests/plugin/data/sw_pulsar/services/__init__.py | 16 ++++
tests/plugin/data/sw_pulsar/services/consumer.py | 32 +++++++
tests/plugin/data/sw_pulsar/services/producer.py | 43 +++++++++
tests/plugin/data/sw_pulsar/test_pulsar.py | 36 ++++++++
17 files changed, 485 insertions(+), 5 deletions(-)
diff --git a/.gitmodules b/.gitmodules
index ebb8ea3..27ba66d 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -18,4 +18,4 @@
#
[submodule "protocol"]
path = protocol
- url = https://github.com/apache/skywalking-data-collect-protocol
+ url = https://github.com/apache/skywalking-data-collect-protocol.git
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 962083a..68c0563 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@
- Plugins:
- Add neo4j plugin.(#312)
+ - Add pulsar plugin.(#345)
- Fixes:
- Fix unexpected 'No active span' IllegalStateError (#311)
diff --git a/Makefile b/Makefile
index c581509..408d42b 100644
--- a/Makefile
+++ b/Makefile
@@ -43,9 +43,11 @@ poetry:
ifeq ($(OS),Windows)
-powershell (Invoke-WebRequest -Uri https://install.python-poetry.org
-UseBasicParsing).Content | py -
poetry self update
-else
+else ifeq ($(OS),Darwin)
-curl -sSL https://install.python-poetry.org | python3 -
poetry self update || $(MAKE) poetry-fallback
+else
+ -curl -sSL https://install.python-poetry.org | python3 - --version 1.5.1
endif
.PHONY: gen
diff --git a/docs/en/setup/Plugins.md b/docs/en/setup/Plugins.md
index ca0fc38..da29e47 100644
--- a/docs/en/setup/Plugins.md
+++ b/docs/en/setup/Plugins.md
@@ -36,6 +36,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome
to contribute!)
| [neo4j](https://neo4j.com/docs/python-manual/5/) | Python >=3.7 - ['5.*'];
| `sw_neo4j` |
| [psycopg[binary]](https://www.psycopg.org/) | Python >=3.11 - ['3.1.*'];
Python >=3.7 - ['3.0.18', '3.1.*']; | `sw_psycopg` |
| [psycopg2-binary](https://www.psycopg.org/) | Python >=3.10 - NOT SUPPORTED
YET; Python >=3.7 - ['2.9']; | `sw_psycopg2` |
+| [pulsar-client](https://github.com/apache/pulsar-client-python) | Python
>=3.8 - ['3.3.0']; | `sw_pulsar` |
| [pymongo](https://pymongo.readthedocs.io) | Python >=3.7 - ['3.11.*']; |
`sw_pymongo` |
| [pymysql](https://pymysql.readthedocs.io/en/latest/) | Python >=3.7 -
['1.0']; | `sw_pymysql` |
| [pyramid](https://trypyramid.com) | Python >=3.7 - ['1.10', '2.0']; |
`sw_pyramid` |
diff --git a/docs/en/setup/advanced/LogReporter.md
b/docs/en/setup/advanced/LogReporter.md
index e08e65f..9d16b86 100644
--- a/docs/en/setup/advanced/LogReporter.md
+++ b/docs/en/setup/advanced/LogReporter.md
@@ -9,7 +9,7 @@ Log reporter supports all three protocols including `grpc`,
`http` and `kafka`,
If chosen `http` protocol, the logs will be batch-reported to the collector
REST endpoint `oap/v3/logs`.
If chosen `kafka` protocol, please make sure to config
-[kafka-fetcher](https://skywalking.apache.org/docs/main/v9.1.0/en/setup/backend/kafka-fetcher/)
+[kafka-fetcher](https://skywalking.apache.org/docs/main/v10.0.1/en/setup/backend/kafka-fetcher/)
on the OAP side, and make sure Python agent config `kafka_bootstrap_servers`
points to your Kafka brokers.
**Please make sure OAP is consuming the same Kafka topic as your agent
produces to, `kafka_namespace` must match OAP side configuration
`plugin.kafka.namespace`**
diff --git a/poetry.lock b/poetry.lock
index 52f500a..d098307 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2280,6 +2280,53 @@ files = [
{file = "psycopg2_binary-2.9.7-cp39-cp39-win_amd64.whl", hash =
"sha256:eb3b8d55924a6058a26db69fb1d3e7e32695ff8b491835ba9f479537e14dcf9f"},
]
+[[package]]
+name = "pulsar-client"
+version = "3.3.0"
+description = "Apache Pulsar Python client library"
+optional = false
+python-versions = "*"
+files = [
+ {file = "pulsar_client-3.3.0-cp310-cp310-macosx_10_15_universal2.whl",
hash =
"sha256:c31afd3e67a044ff93177df89e08febf214cc965e95ede097d9fe8755af00e01"},
+ {file =
"pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
hash =
"sha256:1f66982284571674b215324cc26b5c2f7c56c7043113c47a7084cb70d67a8afb"},
+ {file =
"pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
hash =
"sha256:7fe50a06f81c48a75a9b95c27a6446260039adca71d9face273740de96b2efca"},
+ {file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash
= "sha256:d4c46a4b96a6e9919cfe220156d69a2ede8053d9ea1add4ada108abcf2ba9775"},
+ {file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash =
"sha256:1e4b5d44b992c9b036286b483f3588c10b89c6047fb59d80c7474445997f4e10"},
+ {file = "pulsar_client-3.3.0-cp310-cp310-win_amd64.whl", hash =
"sha256:497a59ac6b650835a3b2c502f53477e5c98e5226998ca3f17c0b0a3eb4d67d08"},
+ {file = "pulsar_client-3.3.0-cp311-cp311-macosx_10_15_universal2.whl",
hash =
"sha256:386e78ff52058d881780bae1f6e84ac9434ae0b01a8581755ca8cc0dc844a332"},
+ {file =
"pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
hash =
"sha256:3e4ecb780df58bcfd3918590bd3ff31ed79bccfbef3a1a60370642eb1e14a9d2"},
+ {file =
"pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
hash =
"sha256:7ce1e215c252f22a6f26ca5e9076826041a04d88dc213b92c86b524be2774a64"},
+ {file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash
= "sha256:88b0fd5be73a4103986b9dbe3a66468cf8829371e34af87ff8f216e3980f4cbe"},
+ {file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash =
"sha256:33656450536d83eed1563ff09692c2c415fb199d88e9ed97d701ca446a119e1b"},
+ {file = "pulsar_client-3.3.0-cp311-cp311-win_amd64.whl", hash =
"sha256:ce33de700b06583df8777e139d68cb4b4b3d0a2eac168d74278d8935f357fb10"},
+ {file = "pulsar_client-3.3.0-cp37-cp37m-macosx_10_15_universal2.whl", hash
= "sha256:7b5dd25cf778d6c980d36c53081e843ea272afe7af4f0ad6394ae9513f94641b"},
+ {file =
"pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
hash =
"sha256:33c4e6865fda62a2e460f823dce4d49ac2973a4459b8ff99eda5fdd6aaaebf46"},
+ {file =
"pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
hash =
"sha256:f1810ddc623c8de2675d17405ce47057a9a2b92298e708ce4d9564847f5ad904"},
+ {file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash =
"sha256:8259c3b856eb6deaa1f93dce893ab18d99d36d102da5612c8e97a4fb41b70ab1"},
+ {file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash =
"sha256:5e7a48b2e505cde758fd51a601b5da0671fa98c9baee38362aaaa3ab2b930c28"},
+ {file = "pulsar_client-3.3.0-cp37-cp37m-win_amd64.whl", hash =
"sha256:ede264385d47257b2f2b08ecde9181ec5338bea5639cc543d1856f01736778d2"},
+ {file = "pulsar_client-3.3.0-cp38-cp38-macosx_10_15_universal2.whl", hash
= "sha256:0f64c62746ccd5b65a0c505f5f40b9af1f147eb1fa2d8f9c90cd5c8b92dd8597"},
+ {file =
"pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
hash =
"sha256:5b84a20c9012e3c4ef1b7085acd7467197118c090b378dec27d773fb79d91556"},
+ {file =
"pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
hash =
"sha256:c4e15fa696e275ccb66d0791fdc19c4dea0420d81349c8055e485b134125e14f"},
+ {file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash =
"sha256:72cbb1bdcba2dd1265296b5ba65331622ee89c16db75edaad46dd7b90c6dd447"},
+ {file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash =
"sha256:d54dd12955bf587dd46d9184444af5e853d9da2a14bbfb739ed2c7c3b78ce280"},
+ {file = "pulsar_client-3.3.0-cp38-cp38-win_amd64.whl", hash =
"sha256:43f98afdf0334b2b957a4d96f97a1fe8a7f7fd1e2631d40c3f00b4162f396485"},
+ {file = "pulsar_client-3.3.0-cp39-cp39-macosx_10_15_universal2.whl", hash
= "sha256:efe7c1e6a96daccc522c3567b6847ffa54c13e0f510d9a427b4aeff9fbebe54b"},
+ {file =
"pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
hash =
"sha256:f28e94420090fceeb38e23fc744f3edf8710e48314ef5927d2b674a1d1e43ee0"},
+ {file =
"pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
hash =
"sha256:42c8f3eaa98e2351805ecb6efb6d5fedf47a314a3ce6af0e05ea1449ea7244ed"},
+ {file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash =
"sha256:5e69750f8ae57e55fddf97b459ce0d8b38b2bb85f464a71e871ee6a86d893be7"},
+ {file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash =
"sha256:7e147e5ba460c1818bc05254279a885b4e552bcafb8961d40e31f98d5ff46628"},
+ {file = "pulsar_client-3.3.0-cp39-cp39-win_amd64.whl", hash =
"sha256:694530af1d6c75fb81456fb509778c1868adee31e997ddece6e21678200182ea"},
+]
+
+[package.dependencies]
+certifi = "*"
+
+[package.extras]
+all = ["apache-bookkeeper-client (>=4.16.1)", "fastavro (==1.7.3)", "grpcio
(>=1.8.2)", "prometheus-client", "protobuf (>=3.6.1,<=3.20.3)", "ratelimit"]
+avro = ["fastavro (==1.7.3)"]
+functions = ["apache-bookkeeper-client (>=4.16.1)", "grpcio (>=1.8.2)",
"prometheus-client", "protobuf (>=3.6.1,<=3.20.3)", "ratelimit"]
+
[[package]]
name = "pycodestyle"
version = "2.9.1"
@@ -3683,4 +3730,4 @@ sync = ["kafka-python", "requests"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.7, <3.12"
-content-hash =
"016644168e470e2904bc0a4109c0218c7b9ecf0890d17a32e28aa81bcda0e8d0"
+content-hash =
"ae3cd5c63201383530bf2daac37be0ef9399bf4384dbe42048c81e945b70642a"
diff --git a/protocol b/protocol
index 9b2f4a5..b5f6ebe 160000
--- a/protocol
+++ b/protocol
@@ -1 +1 @@
-Subproject commit 9b2f4a5fb5694381924674d6c15cbead6a388d97
+Subproject commit b5f6ebe281b96d89968959f55baa3d9aa1bfecee
diff --git a/pyproject.toml b/pyproject.toml
index 734b058..8fc836f 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -156,6 +156,7 @@ loguru = "^0.6.0"
httpx = "^0.23.3"
confluent-kafka = "^2.0.2"
neo4j = "^5.9.0"
+pulsar-client = "3.3.0"
[tool.poetry.group.lint.dependencies]
pylint = '2.13.9'
diff --git a/skywalking/__init__.py b/skywalking/__init__.py
index 1ed0c2b..ec1dc09 100644
--- a/skywalking/__init__.py
+++ b/skywalking/__init__.py
@@ -36,6 +36,8 @@ class Component(Enum):
KafkaConsumer = 41
RabbitmqProducer = 52
RabbitmqConsumer = 53
+ PulsarProducer = 73
+ PulsarConsumer = 74
Elasticsearch = 47
HBase = 94
Neo4j = 112
diff --git a/skywalking/plugins/sw_pulsar.py b/skywalking/plugins/sw_pulsar.py
new file mode 100644
index 0000000..33b24a3
--- /dev/null
+++ b/skywalking/plugins/sw_pulsar.py
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from skywalking import Layer, Component
+from skywalking.trace.carrier import Carrier
+from skywalking.trace.context import get_context
+from skywalking.trace.tags import TagMqTopic, TagMqBroker
+
+link_vector = ['https://github.com/apache/pulsar-client-python']
+support_matrix = {
+ 'pulsar-client': {
+ '>=3.8': ['3.3.0']
+ }
+}
+note = """"""
+
+
+def install():
+ from pulsar import Producer
+ from pulsar import Consumer
+ from pulsar import Client
+
+ __init = Client.__init__
+ _send = Producer.send
+ _receive = Consumer.receive
+ _peer = ''
+
+ def get_peer():
+ return _peer
+
+ def set_peer(value):
+ nonlocal _peer
+ _peer = value
+
+ def _sw_init(self, service_url):
+ __init(self, service_url)
+ set_peer(service_url)
+
+ def _sw_send_func(_send):
+ def _sw_send(this, content,
+ properties=None,
+ partition_key=None,
+ sequence_id=None,
+ replication_clusters=None,
+ disable_replication=False,
+ event_timestamp=None,
+ deliver_at=None,
+ deliver_after=None,
+ ):
+ topic = this._producer.topic().split('/')[-1]
+ with
get_context().new_exit_span(op=f'Pulsar/Topic/{topic}/Producer',
peer=get_peer(),
+
component=Component.PulsarProducer) as span:
+ span.tag(TagMqTopic(topic))
+ span.tag(TagMqBroker(get_peer()))
+ span.layer = Layer.MQ
+
+ carrier = span.inject()
+ if properties is None:
+ properties = {}
+ for item in carrier:
+ properties[item.key] = item.val
+
+ return _send(this, content, properties=properties,
partition_key=partition_key,
+ sequence_id=sequence_id,
replication_clusters=replication_clusters,
+ disable_replication=disable_replication,
event_timestamp=event_timestamp,
+ deliver_at=deliver_at,
deliver_after=deliver_after)
+
+ return _sw_send
+
+ def _sw_receive_func(_receive):
+ def _sw_receive(this, timeout_millis=None):
+ res = _receive(this, timeout_millis=timeout_millis)
+ if res:
+ topic = res.topic_name().split('/')[-1]
+ properties = res.properties()
+ carrier = Carrier()
+ for item in carrier:
+ if item.key in properties.keys():
+ val = res.properties().get(item.key)
+ if val is not None:
+ item.val = val
+
+ with
get_context().new_entry_span(op=f'Pulsar/Topic/{topic}/Consumer',
carrier=carrier) as span:
+ span.tag(TagMqTopic(topic))
+ span.tag(TagMqBroker(get_peer()))
+ span.layer = Layer.MQ
+ span.component = Component.PulsarConsumer
+ return res
+
+ return _sw_receive
+
+ Client.__init__ = _sw_init
+ Producer.send = _sw_send_func(_send)
+ Consumer.receive = _sw_receive_func(_receive)
diff --git a/tests/plugin/data/sw_pulsar/__init__.py
b/tests/plugin/data/sw_pulsar/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/data/sw_pulsar/docker-compose.yml
b/tests/plugin/data/sw_pulsar/docker-compose.yml
new file mode 100644
index 0000000..83197e2
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/docker-compose.yml
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '2.1'
+
+services:
+ collector:
+ extends:
+ service: collector
+ file: ../../docker-compose.base.yml
+
+ pulsar-server:
+ image: apachepulsar/pulsar:3.2.0
+ hostname: pulsar-server
+ ports:
+ - 6650:6650
+ - 8080:8080
+ networks:
+ - beyond
+ command: ["bash","-c", "bin/pulsar standalone"]
+ healthcheck:
+ test: ["CMD", "nc", "-nz", "127.0.0.1", "8080"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ producer:
+ extends:
+ service: agent
+ file: ../../docker-compose.base.yml
+ ports:
+ - 9090:9090
+ volumes:
+ - .:/app
+ command: ['bash', '-c', 'pip install flask && pip install -r
/app/requirements.txt && sw-python run python3 /app/services/producer.py']
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ collector:
+ condition: service_healthy
+ pulsar-server:
+ condition: service_healthy
+ consumer:
+ condition: service_healthy
+ environment:
+ SW_AGENT_NAME: producer
+ SW_AGENT_LOGGING_LEVEL: INFO
+
+ consumer:
+ extends:
+ service: agent
+ file: ../../docker-compose.base.yml
+ ports:
+ - 9091:9091
+ volumes:
+ - .:/app
+ command: ['bash', '-c', 'pip install flask && pip install -r
/app/requirements.txt && sw-python run python3 /app/services/consumer.py']
+ healthcheck:
+ test: ["CMD", "bash", "-c", "ps -ef | grep /app/services/consumer | grep
-v grep"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ collector:
+ condition: service_healthy
+ pulsar-server:
+ condition: service_healthy
+ environment:
+ SW_AGENT_NAME: consumer
+ SW_AGENT_LOGGING_LEVEL: INFO
+
+networks:
+ beyond:
\ No newline at end of file
diff --git a/tests/plugin/data/sw_pulsar/expected.data.yml
b/tests/plugin/data/sw_pulsar/expected.data.yml
new file mode 100644
index 0000000..6b61114
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/expected.data.yml
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+segmentItems:
+ - serviceName: producer
+ segmentSize: 1
+ segments:
+ - segmentId: not null
+ spans:
+ - operationName: Pulsar/Topic/sw-topic/Producer
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: MQ
+ tags:
+ - key: mq.topic
+ value: sw-topic
+ - key: mq.broker
+ value: 'pulsar://pulsar-server:6650'
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 73
+ spanType: Exit
+ peer: pulsar://pulsar-server:6650
+ skipAnalysis: false
+ - operationName: /users
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ tags:
+ - key: http.method
+ value: GET
+ - key: http.url
+ value: http://0.0.0.0:9090/users
+ - key: http.status_code
+ value: '200'
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 7001
+ spanType: Entry
+ peer: not null
+ skipAnalysis: false
+ - serviceName: consumer
+ segmentSize: 1
+ segments:
+ - segmentId: not null
+ spans:
+ - operationName: Pulsar/Topic/sw-topic/Consumer
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ tags:
+ - key: mq.topic
+ value: sw-topic
+ - key: mq.broker
+ value: 'pulsar://pulsar-server:6650'
+ refs:
+ - parentEndpoint: Pulsar/Topic/sw-topic/Producer
+ networkAddress: 'pulsar://pulsar-server:6650'
+ refType: CrossProcess
+ parentSpanId: 1
+ parentTraceSegmentId: not null
+ parentServiceInstance: not null
+ parentService: producer
+ traceId: not null
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 74
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
diff --git a/tests/plugin/data/sw_pulsar/services/__init__.py
b/tests/plugin/data/sw_pulsar/services/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/services/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/data/sw_pulsar/services/consumer.py
b/tests/plugin/data/sw_pulsar/services/consumer.py
new file mode 100644
index 0000000..fc444b1
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/services/consumer.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if __name__ == '__main__':
+ import pulsar
+
+ client = pulsar.Client(service_url='pulsar://pulsar-server:6650')
+ consumer = client.subscribe('sw-topic', 'sw-subscription')
+
+ while True:
+ try:
+ msg = consumer.receive()
+ print('Received message = ', str(msg.data().decode('utf-8')),
'|message_id = ', msg.message_id())
+ consumer.acknowledge(msg)
+ except pulsar.Interrupted:
+ break
+
+ client.close()
diff --git a/tests/plugin/data/sw_pulsar/services/producer.py
b/tests/plugin/data/sw_pulsar/services/producer.py
new file mode 100644
index 0000000..9505f82
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/services/producer.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+if __name__ == '__main__':
+ import pulsar
+ from flask import Flask, jsonify
+ from pulsar import BatchingType
+
+ app = Flask(__name__)
+ client = pulsar.Client(service_url='pulsar://pulsar-server:6650')
+ producer = client.create_producer(
+ 'sw-topic',
+ block_if_queue_full=True,
+ batching_enabled=True,
+ batching_max_publish_delay_ms=10,
+ batching_type=BatchingType.KeyBased
+ )
+
+
+ @app.route('/users', methods=['POST', 'GET'])
+ def application():
+ producer.send('I love skywalking 3 thousand'.encode('utf-8'), None)
+ producer.flush()
+ producer.close()
+ return jsonify({'song': 'Despacito', 'artist': 'Luis Fonsi'})
+
+ PORT = 9090
+ app.run(host='0.0.0.0', port=PORT, debug=True)
diff --git a/tests/plugin/data/sw_pulsar/test_pulsar.py
b/tests/plugin/data/sw_pulsar/test_pulsar.py
new file mode 100644
index 0000000..59767e8
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/test_pulsar.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Callable
+
+import pytest
+import requests
+
+from skywalking.plugins.sw_pulsar import support_matrix
+from tests.orchestrator import get_test_vector
+from tests.plugin.base import TestPluginBase
+
+
[email protected]
+def prepare():
+ # type: () -> Callable
+ return lambda *_: requests.get('http://0.0.0.0:9090/users', timeout=5)
+
+
+class TestPlugin(TestPluginBase):
+ @pytest.mark.parametrize('version',
get_test_vector(lib_name='pulsar-client', support_matrix=support_matrix))
+ def test_plugin(self, docker_compose, version):
+ self.validate()