This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new cf35f61  [Core][Feature] Support snapshot context (#56)
cf35f61 is described below

commit cf35f615a06f37aa1aa57b379e9abe611071637a
Author: huawei <[email protected]>
AuthorDate: Mon Aug 3 22:07:14 2020 +0800

    [Core][Feature] Support snapshot context (#56)
---
 skywalking/agent/protocol/grpc.py          |  2 +-
 skywalking/trace/context.py                | 38 +++++++++++++-
 skywalking/trace/segment.py                | 19 ++++++-
 skywalking/trace/snapshot.py               | 44 +++++++++++++++++
 tests/plugin/sw_flask/expected.data.yml    | 79 ++++++++++++++++++++++++++++--
 tests/plugin/sw_flask/services/consumer.py | 16 ++++++
 6 files changed, 190 insertions(+), 8 deletions(-)

diff --git a/skywalking/agent/protocol/grpc.py 
b/skywalking/agent/protocol/grpc.py
index 1e47ba4..ca24895 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -88,7 +88,7 @@ class GrpcProtocol(Protocol):
                             value=str(tag.val),
                         ) for tag in span.tags],
                         refs=[SegmentReference(
-                            refType=0,
+                            refType=0 if ref.ref_type == "CrossProcess" else 1,
                             traceId=ref.trace_id,
                             parentTraceSegmentId=ref.segment_id,
                             parentSpanId=ref.span_id,
diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py
index 99fe8af..23cdbc6 100644
--- a/skywalking/trace/context.py
+++ b/skywalking/trace/context.py
@@ -20,8 +20,10 @@ import threading
 from typing import List
 
 from skywalking import agent, config
+from skywalking.trace import ID
 from skywalking.trace.carrier import Carrier
-from skywalking.trace.segment import Segment
+from skywalking.trace.segment import Segment, SegmentRef
+from skywalking.trace.snapshot import Snapshot
 from skywalking.trace.span import Span, Kind, NoopSpan, EntrySpan, ExitSpan
 from skywalking.utils.counter import Counter
 
@@ -137,6 +139,28 @@ class SpanContext(object):
 
         self._correlation[key] = value
 
+    def capture(self):
+        if len(self.spans) == 0:
+            return None
+
+        return Snapshot(
+            segment_id=str(self.segment.segment_id),
+            span_id=self.active_span().sid,
+            trace_id=self.segment.related_traces[0],
+            endpoint=self.spans[0].op,
+            correlation=self._correlation,
+        )
+
+    def continued(self, snapshot: 'Snapshot'):
+        if snapshot is None:
+            return None
+        if not snapshot.is_from_current(self) and snapshot.is_valid():
+            ref = SegmentRef.build_ref(snapshot)
+            span = self.active_span()
+            span.refs.append(ref)
+            self.segment.relate(ID(ref.trace_id))
+            self._correlation.update(snapshot.correlation)
+
 
 class NoopContext(SpanContext):
     def __init__(self):
@@ -169,6 +193,18 @@ class NoopContext(SpanContext):
     def active_span(self):
         return self._noop_span
 
+    def capture(self):
+        return Snapshot(
+            segment_id=None,
+            span_id=-1,
+            trace_id=None,
+            endpoint=None,
+            correlation=self._correlation,
+        )
+
+    def continued(self, snapshot: 'Snapshot'):
+        self._correlation.update(snapshot.correlation)
+
 
 _thread_local = threading.local()
 _thread_local.context = None
diff --git a/skywalking/trace/segment.py b/skywalking/trace/segment.py
index 4a8d905..f2e0d4e 100644
--- a/skywalking/trace/segment.py
+++ b/skywalking/trace/segment.py
@@ -18,17 +18,20 @@
 import time
 from typing import List, TYPE_CHECKING
 
+from skywalking import config
+
 from skywalking.trace import ID
 from skywalking.utils.lang import tostring
 
 if TYPE_CHECKING:
     from skywalking.trace.carrier import Carrier
     from skywalking.trace.span import Span
+    from skywalking.trace.snapshot import Snapshot
 
 
 class SegmentRef(object):
-    def __init__(self, carrier: 'Carrier'):
-        self.ref_type = 'CrossProcess'  # type: str
+    def __init__(self, carrier: 'Carrier', ref_type: str = 'CrossProcess'):
+        self.ref_type = ref_type  # type: str
         self.trace_id = carrier.trace_id  # type: str
         self.segment_id = carrier.segment_id  # type: str
         self.span_id = int(carrier.span_id)  # type: int
@@ -49,6 +52,18 @@ class SegmentRef(object):
             self.endpoint == other.endpoint and \
             self.client_address == other.client_address
 
+    @classmethod
+    def build_ref(cls, snapshot: 'Snapshot'):
+        from skywalking.trace.carrier import Carrier
+        carrier = Carrier()
+        carrier.trace_id = str(snapshot.trace_id)
+        carrier.segment_id = str(snapshot.segment_id)
+        carrier.endpoint = snapshot.endpoint
+        carrier.span_id = snapshot.span_id
+        carrier.service = config.service_name
+        carrier.service_instance = config.service_instance
+        return SegmentRef(carrier, ref_type="CrossThread")
+
 
 class _NewID(ID):
     pass
diff --git a/skywalking/trace/snapshot.py b/skywalking/trace/snapshot.py
new file mode 100644
index 0000000..3072ba9
--- /dev/null
+++ b/skywalking/trace/snapshot.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from skywalking.trace.context import SpanContext
+
+from skywalking.trace import ID
+
+
+class Snapshot:
+    def __init__(
+            self,
+            segment_id: str = None,
+            span_id: int = None,
+            trace_id: ID = None,
+            endpoint: str = None,
+            correlation: dict = None
+    ):
+        self.trace_id = trace_id  # type: ID
+        self.segment_id = segment_id  # type: str
+        self.span_id = span_id  # type: int
+        self.endpoint = endpoint  # type: str
+        self.correlation = correlation.copy()  # type: dict
+
+    def is_from_current(self, context: 'SpanContext'):
+        return self.segment_id is not None and self.segment_id == 
context.capture().segment_id
+
+    def is_valid(self):
+        return self.segment_id is not None and self.span_id > -1 and 
self.trace_id is not None
diff --git a/tests/plugin/sw_flask/expected.data.yml 
b/tests/plugin/sw_flask/expected.data.yml
index 453f539..add2226 100644
--- a/tests/plugin/sw_flask/expected.data.yml
+++ b/tests/plugin/sw_flask/expected.data.yml
@@ -17,7 +17,7 @@
 
 segmentItems:
   - serviceName: provider
-    segmentSize: 1
+    segmentSize: 2
     segments:
       - segmentId: not null
         spans:
@@ -35,7 +35,36 @@ segmentItems:
                 value: '200'
             refs:
               - parentEndpoint: /users
-                networkAddress: provider:9091
+                networkAddress: 'provider:9091'
+                refType: CrossProcess
+                parentSpanId: 1
+                parentTraceSegmentId: not null
+                parentServiceInstance: not null
+                parentService: consumer
+                traceId: not null
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 7001
+            spanType: Entry
+            peer: not null
+            skipAnalysis: false
+      - segmentId: not null
+        spans:
+          - operationName: /users
+            operationId: 0
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            tags:
+              - key: http.method
+                value: POST
+              - key: url
+                value: http://provider:9091/users
+              - key: status.code
+                value: '200'
+            refs:
+              - parentEndpoint: /users
+                networkAddress: 'provider:9091'
                 refType: CrossProcess
                 parentSpanId: 1
                 parentTraceSegmentId: not null
@@ -49,7 +78,7 @@ segmentItems:
             peer: not null
             skipAnalysis: false
   - serviceName: consumer
-    segmentSize: 1
+    segmentSize: 2
     segments:
       - segmentId: not null
         spans:
@@ -58,6 +87,48 @@ segmentItems:
             parentSpanId: 0
             spanId: 1
             spanLayer: Http
+            startTime:  gt 0
+            endTime:  gt 0
+            componentId: 7002
+            isError: false
+            spanType: Exit
+            peer: provider:9091
+            skipAnalysis: false
+            tags:
+              - key: http.method
+                value: POST
+              - key: url
+                value: 'http://provider:9091/users'
+              - key: status.code
+                value: '200'
+          - operationName: /test
+            operationId: 0
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Unknown
+            startTime:  gt 0
+            endTime:  gt 0
+            componentId: 0
+            isError: false
+            spanType: Local
+            peer: ''
+            skipAnalysis: false
+            refs:
+              - parentEndpoint: /users
+                networkAddress: ''
+                refType: CrossThread
+                parentSpanId: 0
+                parentTraceSegmentId: not null
+                parentServiceInstance: not null
+                parentService: consumer
+                traceId: not null
+      - segmentId: not null
+        spans:
+          - operationName: /users
+            operationId: 0
+            parentSpanId: 0
+            spanId: 1
+            spanLayer: Http
             tags:
               - key: http.method
                 value: POST
@@ -90,4 +161,4 @@ segmentItems:
             componentId: 7001
             spanType: Entry
             peer: not null
-            skipAnalysis: false
\ No newline at end of file
+            skipAnalysis: false
diff --git a/tests/plugin/sw_flask/services/consumer.py 
b/tests/plugin/sw_flask/services/consumer.py
index 129222a..af6d078 100644
--- a/tests/plugin/sw_flask/services/consumer.py
+++ b/tests/plugin/sw_flask/services/consumer.py
@@ -33,7 +33,23 @@ if __name__ == '__main__':
     def application():
         from skywalking.trace.context import get_context
         get_context().put_correlation("correlation", "correlation")
+
+        def post(snap):
+            with get_context().new_local_span("/test"):
+                get_context().continued(snap)
+                requests.post("http://provider:9091/users";)
+
+        snapshot = get_context().capture()
+
+        from threading import Thread
+        t = Thread(target=post, args=(snapshot,))
+        t.start()
+        t.join()
+
         res = requests.post("http://provider:9091/users";)
+
+        t.join()
+
         return jsonify(res.json())
 
     PORT = 9090

Reply via email to