This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch fix/runnable-cross-thread in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
commit 10d5e185a346c80bf0cf5e1f2155553c7d1bddf5 Author: Wu Sheng <[email protected]> AuthorDate: Sun Apr 12 21:44:58 2026 +0800 fix: support module-level @runnable with continue_tracing() for cross-thread context propagation Previously, @runnable captured the snapshot at decoration time, which only worked when applied inline during an active request. Module-level usage (the natural Python pattern) silently broke cross-thread trace linking. @runnable now returns a _RunnableWrapper object with a continue_tracing() method. Users call continue_tracing() on the parent thread to capture the snapshot, then pass the result as Thread target: @runnable(op='/post') def post(): requests.post(...) thread = Thread(target=post.continue_tracing()) thread.start() Direct calls (post()) still work as a local span without cross-thread propagation, same as @trace. Closes apache/skywalking#11605 Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]> --- skywalking/decorators.py | 66 +++++++++--- tests/plugin/web/sw_threading/__init__.py | 16 +++ tests/plugin/web/sw_threading/docker-compose.yml | 65 ++++++++++++ tests/plugin/web/sw_threading/expected.data.yml | 113 +++++++++++++++++++++ tests/plugin/web/sw_threading/services/__init__.py | 16 +++ tests/plugin/web/sw_threading/services/consumer.py | 44 ++++++++ tests/plugin/web/sw_threading/services/provider.py | 32 ++++++ tests/plugin/web/sw_threading/test_threading.py | 33 ++++++ 8 files changed, 369 insertions(+), 16 deletions(-) diff --git a/skywalking/decorators.py b/skywalking/decorators.py index d9fb211..886176a 100644 --- a/skywalking/decorators.py +++ b/skywalking/decorators.py @@ -66,6 +66,54 @@ def trace( return decorator +class _RunnableWrapper: + """Wrapper returned by @runnable. Call continue_tracing() on the parent thread + to capture the current trace context, then pass the result as Thread target.""" + + def __init__(self, func, op, layer, component, tags): + self._func = func + self._op = op + self._layer = layer + self._component = component + self._tags = tags + # Preserve original function attributes + self.__name__ = func.__name__ + self.__doc__ = func.__doc__ + self.__module__ = getattr(func, '__module__', None) + self.__wrapped__ = func + + def __call__(self, *args, **kwargs): + """Direct call — creates a local span (same as @trace).""" + context = get_context() + with context.new_local_span(op=self._op) as span: + span.layer = self._layer + span.component = self._component + if self._tags: + for tag in self._tags: + span.tag(tag) + return self._func(*args, **kwargs) + + def continue_tracing(self): + """Capture the current trace context snapshot on the calling thread. + Returns a callable to be used as Thread target that will propagate + the trace context to the child thread via CrossThread reference.""" + snapshot = get_context().capture() + + def _continued_wrapper(*args, **kwargs): + context = get_context() + with context.new_local_span(op=self._op) as span: + if snapshot is not None: + context.continued(snapshot) + span.layer = self._layer + span.component = self._component + if self._tags: + for tag in self._tags: + span.tag(tag) + return self._func(*args, **kwargs) + + return _continued_wrapper + + def runnable( op: str = None, layer: Layer = Layer.Unknown, @@ -73,21 +121,7 @@ def runnable( tags: List[Tag] = None, ): def decorator(func): - snapshot = get_context().capture() - - @wraps(func) - def wrapper(*args, **kwargs): - _op = op or f'Thread/{func.__name__}' - context = get_context() - with context.new_local_span(op=_op) as span: - context.continued(snapshot) - span.layer = layer - span.component = component - if tags: - for tag in tags: - span.tag(tag) - func(*args, **kwargs) - - return wrapper + _op = op or f'Thread/{func.__name__}' + return _RunnableWrapper(func, _op, layer, component, tags) return decorator diff --git a/tests/plugin/web/sw_threading/__init__.py b/tests/plugin/web/sw_threading/__init__.py new file mode 100644 index 0000000..b1312a0 --- /dev/null +++ b/tests/plugin/web/sw_threading/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/web/sw_threading/docker-compose.yml b/tests/plugin/web/sw_threading/docker-compose.yml new file mode 100644 index 0000000..b443714 --- /dev/null +++ b/tests/plugin/web/sw_threading/docker-compose.yml @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2.1' + +services: + collector: + extends: + service: collector + file: ../../docker-compose.base.yml + + provider: + extends: + service: agent + file: ../../docker-compose.base.yml + ports: + - 9091:9091 + volumes: + - .:/app + command: ['bash', '-c', 'pip install flask itsdangerous==2.0.1 "Werkzeug<3" && pip install -r /app/requirements.txt && sw-python run python3 /app/services/provider.py'] + depends_on: + collector: + condition: service_healthy + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9091"] + interval: 5s + timeout: 60s + retries: 120 + environment: + SW_AGENT_NAME: provider + SW_AGENT_LOGGING_LEVEL: DEBUG + + consumer: + extends: + service: agent + file: ../../docker-compose.base.yml + ports: + - 9090:9090 + volumes: + - .:/app + command: ['bash', '-c', 'pip install flask itsdangerous==2.0.1 "Werkzeug<3" && pip install -r /app/requirements.txt && sw-python run python3 /app/services/consumer.py'] + depends_on: + collector: + condition: service_healthy + provider: + condition: service_healthy + environment: + SW_AGENT_NAME: consumer + SW_AGENT_LOGGING_LEVEL: DEBUG +networks: + beyond: diff --git a/tests/plugin/web/sw_threading/expected.data.yml b/tests/plugin/web/sw_threading/expected.data.yml new file mode 100644 index 0000000..fbb5ef2 --- /dev/null +++ b/tests/plugin/web/sw_threading/expected.data.yml @@ -0,0 +1,113 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +segmentItems: + - serviceName: provider + segmentSize: 1 + segments: + - segmentId: not null + spans: + - operationName: /users + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: POST + - key: http.url + value: http://provider:9091/users + - key: http.status_code + value: '200' + refs: + - parentEndpoint: /users + networkAddress: 'provider:9091' + refType: CrossProcess + parentSpanId: 1 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: consumer + traceId: not null + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false + - serviceName: consumer + segmentSize: 2 + segments: + - segmentId: not null + spans: + - operationName: /users + parentSpanId: 0 + spanId: 1 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 7002 + isError: false + spanType: Exit + peer: provider:9091 + skipAnalysis: false + tags: + - key: http.method + value: POST + - key: http.url + value: 'http://provider:9091/users' + - key: http.status_code + value: '200' + - operationName: /post + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Unknown + startTime: gt 0 + endTime: gt 0 + componentId: 0 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + refs: + - parentEndpoint: /users + networkAddress: '' + refType: CrossThread + parentSpanId: 0 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: consumer + traceId: not null + - segmentId: not null + spans: + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: GET + - key: http.url + value: http://0.0.0.0:9090/users + - key: http.status_code + value: '200' + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false diff --git a/tests/plugin/web/sw_threading/services/__init__.py b/tests/plugin/web/sw_threading/services/__init__.py new file mode 100644 index 0000000..b1312a0 --- /dev/null +++ b/tests/plugin/web/sw_threading/services/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/web/sw_threading/services/consumer.py b/tests/plugin/web/sw_threading/services/consumer.py new file mode 100644 index 0000000..fa7bfef --- /dev/null +++ b/tests/plugin/web/sw_threading/services/consumer.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import requests + +from skywalking.decorators import runnable + + +# Module-level @runnable — this is the pattern from issue #11605 +@runnable(op='/post') +def post(): + requests.post('http://provider:9091/users', timeout=5) + + +if __name__ == '__main__': + from flask import Flask, jsonify + + app = Flask(__name__) + + @app.route('/users', methods=['POST', 'GET']) + def application(): + from threading import Thread + t = Thread(target=post.continue_tracing()) + t.start() + t.join() + + return jsonify({'status': 'ok'}) + + PORT = 9090 + app.run(host='0.0.0.0', port=PORT, debug=True) diff --git a/tests/plugin/web/sw_threading/services/provider.py b/tests/plugin/web/sw_threading/services/provider.py new file mode 100644 index 0000000..79cc50d --- /dev/null +++ b/tests/plugin/web/sw_threading/services/provider.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import time + + +if __name__ == '__main__': + from flask import Flask, jsonify + + app = Flask(__name__) + + @app.route('/users', methods=['POST', 'GET']) + def application(): + time.sleep(0.5) + return jsonify({'status': 'ok'}) + + PORT = 9091 + app.run(host='0.0.0.0', port=PORT, debug=True) diff --git a/tests/plugin/web/sw_threading/test_threading.py b/tests/plugin/web/sw_threading/test_threading.py new file mode 100644 index 0000000..5e4a23d --- /dev/null +++ b/tests/plugin/web/sw_threading/test_threading.py @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import Callable + +import pytest +import requests + +from tests.plugin.base import TestPluginBase + + [email protected] +def prepare(): + # type: () -> Callable + return lambda *_: requests.get('http://0.0.0.0:9090/users', timeout=5) + + +class TestPlugin(TestPluginBase): + def test_plugin(self, docker_compose): + self.validate()
