This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new cf35f61 [Core][Feature] Support snapshot context (#56)
cf35f61 is described below
commit cf35f615a06f37aa1aa57b379e9abe611071637a
Author: huawei <[email protected]>
AuthorDate: Mon Aug 3 22:07:14 2020 +0800
[Core][Feature] Support snapshot context (#56)
---
skywalking/agent/protocol/grpc.py | 2 +-
skywalking/trace/context.py | 38 +++++++++++++-
skywalking/trace/segment.py | 19 ++++++-
skywalking/trace/snapshot.py | 44 +++++++++++++++++
tests/plugin/sw_flask/expected.data.yml | 79 ++++++++++++++++++++++++++++--
tests/plugin/sw_flask/services/consumer.py | 16 ++++++
6 files changed, 190 insertions(+), 8 deletions(-)
diff --git a/skywalking/agent/protocol/grpc.py
b/skywalking/agent/protocol/grpc.py
index 1e47ba4..ca24895 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -88,7 +88,7 @@ class GrpcProtocol(Protocol):
value=str(tag.val),
) for tag in span.tags],
refs=[SegmentReference(
- refType=0,
+ refType=0 if ref.ref_type == "CrossProcess" else 1,
traceId=ref.trace_id,
parentTraceSegmentId=ref.segment_id,
parentSpanId=ref.span_id,
diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py
index 99fe8af..23cdbc6 100644
--- a/skywalking/trace/context.py
+++ b/skywalking/trace/context.py
@@ -20,8 +20,10 @@ import threading
from typing import List
from skywalking import agent, config
+from skywalking.trace import ID
from skywalking.trace.carrier import Carrier
-from skywalking.trace.segment import Segment
+from skywalking.trace.segment import Segment, SegmentRef
+from skywalking.trace.snapshot import Snapshot
from skywalking.trace.span import Span, Kind, NoopSpan, EntrySpan, ExitSpan
from skywalking.utils.counter import Counter
@@ -137,6 +139,28 @@ class SpanContext(object):
self._correlation[key] = value
+ def capture(self):
+ if len(self.spans) == 0:
+ return None
+
+ return Snapshot(
+ segment_id=str(self.segment.segment_id),
+ span_id=self.active_span().sid,
+ trace_id=self.segment.related_traces[0],
+ endpoint=self.spans[0].op,
+ correlation=self._correlation,
+ )
+
+ def continued(self, snapshot: 'Snapshot'):
+ if snapshot is None:
+ return None
+ if not snapshot.is_from_current(self) and snapshot.is_valid():
+ ref = SegmentRef.build_ref(snapshot)
+ span = self.active_span()
+ span.refs.append(ref)
+ self.segment.relate(ID(ref.trace_id))
+ self._correlation.update(snapshot.correlation)
+
class NoopContext(SpanContext):
def __init__(self):
@@ -169,6 +193,18 @@ class NoopContext(SpanContext):
def active_span(self):
return self._noop_span
+ def capture(self):
+ return Snapshot(
+ segment_id=None,
+ span_id=-1,
+ trace_id=None,
+ endpoint=None,
+ correlation=self._correlation,
+ )
+
+ def continued(self, snapshot: 'Snapshot'):
+ self._correlation.update(snapshot.correlation)
+
_thread_local = threading.local()
_thread_local.context = None
diff --git a/skywalking/trace/segment.py b/skywalking/trace/segment.py
index 4a8d905..f2e0d4e 100644
--- a/skywalking/trace/segment.py
+++ b/skywalking/trace/segment.py
@@ -18,17 +18,20 @@
import time
from typing import List, TYPE_CHECKING
+from skywalking import config
+
from skywalking.trace import ID
from skywalking.utils.lang import tostring
if TYPE_CHECKING:
from skywalking.trace.carrier import Carrier
from skywalking.trace.span import Span
+ from skywalking.trace.snapshot import Snapshot
class SegmentRef(object):
- def __init__(self, carrier: 'Carrier'):
- self.ref_type = 'CrossProcess' # type: str
+ def __init__(self, carrier: 'Carrier', ref_type: str = 'CrossProcess'):
+ self.ref_type = ref_type # type: str
self.trace_id = carrier.trace_id # type: str
self.segment_id = carrier.segment_id # type: str
self.span_id = int(carrier.span_id) # type: int
@@ -49,6 +52,18 @@ class SegmentRef(object):
self.endpoint == other.endpoint and \
self.client_address == other.client_address
+ @classmethod
+ def build_ref(cls, snapshot: 'Snapshot'):
+ from skywalking.trace.carrier import Carrier
+ carrier = Carrier()
+ carrier.trace_id = str(snapshot.trace_id)
+ carrier.segment_id = str(snapshot.segment_id)
+ carrier.endpoint = snapshot.endpoint
+ carrier.span_id = snapshot.span_id
+ carrier.service = config.service_name
+ carrier.service_instance = config.service_instance
+ return SegmentRef(carrier, ref_type="CrossThread")
+
class _NewID(ID):
pass
diff --git a/skywalking/trace/snapshot.py b/skywalking/trace/snapshot.py
new file mode 100644
index 0000000..3072ba9
--- /dev/null
+++ b/skywalking/trace/snapshot.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from skywalking.trace.context import SpanContext
+
+from skywalking.trace import ID
+
+
+class Snapshot:
+ def __init__(
+ self,
+ segment_id: str = None,
+ span_id: int = None,
+ trace_id: ID = None,
+ endpoint: str = None,
+ correlation: dict = None
+ ):
+ self.trace_id = trace_id # type: ID
+ self.segment_id = segment_id # type: str
+ self.span_id = span_id # type: int
+ self.endpoint = endpoint # type: str
+ self.correlation = correlation.copy() # type: dict
+
+ def is_from_current(self, context: 'SpanContext'):
+ return self.segment_id is not None and self.segment_id ==
context.capture().segment_id
+
+ def is_valid(self):
+ return self.segment_id is not None and self.span_id > -1 and
self.trace_id is not None
diff --git a/tests/plugin/sw_flask/expected.data.yml
b/tests/plugin/sw_flask/expected.data.yml
index 453f539..add2226 100644
--- a/tests/plugin/sw_flask/expected.data.yml
+++ b/tests/plugin/sw_flask/expected.data.yml
@@ -17,7 +17,7 @@
segmentItems:
- serviceName: provider
- segmentSize: 1
+ segmentSize: 2
segments:
- segmentId: not null
spans:
@@ -35,7 +35,36 @@ segmentItems:
value: '200'
refs:
- parentEndpoint: /users
- networkAddress: provider:9091
+ networkAddress: 'provider:9091'
+ refType: CrossProcess
+ parentSpanId: 1
+ parentTraceSegmentId: not null
+ parentServiceInstance: not null
+ parentService: consumer
+ traceId: not null
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 7001
+ spanType: Entry
+ peer: not null
+ skipAnalysis: false
+ - segmentId: not null
+ spans:
+ - operationName: /users
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ tags:
+ - key: http.method
+ value: POST
+ - key: url
+ value: http://provider:9091/users
+ - key: status.code
+ value: '200'
+ refs:
+ - parentEndpoint: /users
+ networkAddress: 'provider:9091'
refType: CrossProcess
parentSpanId: 1
parentTraceSegmentId: not null
@@ -49,7 +78,7 @@ segmentItems:
peer: not null
skipAnalysis: false
- serviceName: consumer
- segmentSize: 1
+ segmentSize: 2
segments:
- segmentId: not null
spans:
@@ -58,6 +87,48 @@ segmentItems:
parentSpanId: 0
spanId: 1
spanLayer: Http
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 7002
+ isError: false
+ spanType: Exit
+ peer: provider:9091
+ skipAnalysis: false
+ tags:
+ - key: http.method
+ value: POST
+ - key: url
+ value: 'http://provider:9091/users'
+ - key: status.code
+ value: '200'
+ - operationName: /test
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Unknown
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 0
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ refs:
+ - parentEndpoint: /users
+ networkAddress: ''
+ refType: CrossThread
+ parentSpanId: 0
+ parentTraceSegmentId: not null
+ parentServiceInstance: not null
+ parentService: consumer
+ traceId: not null
+ - segmentId: not null
+ spans:
+ - operationName: /users
+ operationId: 0
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: Http
tags:
- key: http.method
value: POST
@@ -90,4 +161,4 @@ segmentItems:
componentId: 7001
spanType: Entry
peer: not null
- skipAnalysis: false
\ No newline at end of file
+ skipAnalysis: false
diff --git a/tests/plugin/sw_flask/services/consumer.py
b/tests/plugin/sw_flask/services/consumer.py
index 129222a..af6d078 100644
--- a/tests/plugin/sw_flask/services/consumer.py
+++ b/tests/plugin/sw_flask/services/consumer.py
@@ -33,7 +33,23 @@ if __name__ == '__main__':
def application():
from skywalking.trace.context import get_context
get_context().put_correlation("correlation", "correlation")
+
+ def post(snap):
+ with get_context().new_local_span("/test"):
+ get_context().continued(snap)
+ requests.post("http://provider:9091/users")
+
+ snapshot = get_context().capture()
+
+ from threading import Thread
+ t = Thread(target=post, args=(snapshot,))
+ t.start()
+ t.join()
+
res = requests.post("http://provider:9091/users")
+
+ t.join()
+
return jsonify(res.json())
PORT = 9090