This is an automated email from the ASF dual-hosted git repository.

jmjoy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git


The following commit(s) were added to refs/heads/master by this push:
     new 22aa373  Add `Span::prepare_for_async` method and `AbstractSpan` 
trait. (#58)
22aa373 is described below

commit 22aa373de5c5fe294ee2b086790a86a02b43772e
Author: jmjoy <[email protected]>
AuthorDate: Tue May 23 18:25:00 2023 +0800

    Add `Span::prepare_for_async` method and `AbstractSpan` trait. (#58)
---
 README.md                      |  42 +++++++++
 e2e/data/expected_context.yaml |   2 +-
 e2e/docker/Dockerfile          |   4 +-
 src/common/mod.rs              |   1 +
 src/common/wait_group.rs       |  59 +++++++++++++
 src/logging/record.rs          |   5 +-
 src/trace/span.rs              | 174 +++++++++++++++++++++++++++++-------
 src/trace/trace_context.rs     | 196 ++++++++++++++++++++++++++++-------------
 tests/logging.rs               |   2 +-
 tests/trace_context.rs         |  55 +++++++++++-
 10 files changed, 438 insertions(+), 102 deletions(-)

diff --git a/README.md b/README.md
index 3ebf868..715001e 100644
--- a/README.md
+++ b/README.md
@@ -170,6 +170,48 @@ async fn main() -> Result<(), Box<dyn Error>> {
 }
 ```
 
+# Advanced APIs
+
+## Async Span APIs
+
+`Span::prepare_for_async` designed for async use cases.
+When tags, logs, and attributes (including end time) of the span need to be 
set in another
+thread or coroutine.
+
+`TracingContext::wait` wait for all `AsyncSpan` finished.
+
+```rust
+use skywalking::{
+    trace::tracer::Tracer,
+    trace::span::AbstractSpan,
+};
+
+async fn handle(tracer: Tracer) {
+    let mut ctx = tracer.create_trace_context();
+
+    {
+        let span = ctx.create_entry_span("op1");
+
+        // Create AsyncSpan and drop span.
+        // Internally, span will occupy the position of finalized span stack.
+        let mut async_span = span.prepare_for_async();
+
+        // Start async route, catch async_span with `move` keyword.
+        tokio::spawn(async move {
+
+            async_span.add_tag("foo", "bar");
+
+            // Something...
+
+            // async_span will drop here, submit modifications to finalized 
spans stack.
+        });
+    }
+
+    // Wait for all `AsyncSpan` finished.
+    ctx.wait();
+}
+```
+
 # How to compile?
 
 If you have `skywalking-(VERSION).crate`, you can unpack it with the way as 
follows:
diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml
index 6551e9e..ca53043 100644
--- a/e2e/data/expected_context.yaml
+++ b/e2e/data/expected_context.yaml
@@ -50,7 +50,7 @@ segmentItems:
       peer: consumer:8082
       skipAnalysis: false
       spanId: 1
-      spanLayer: Http
+      spanLayer: Unknown
       spanType: Exit
       startTime: gt 0
     - componentId: 11000
diff --git a/e2e/docker/Dockerfile b/e2e/docker/Dockerfile
index 5b55d32..94ab5c3 100644
--- a/e2e/docker/Dockerfile
+++ b/e2e/docker/Dockerfile
@@ -15,8 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-FROM rust:1.63
-RUN apt-get update && apt-get install -y cmake protobuf-compiler=3.12.4-1
+FROM rust:1.65
+RUN apt-get update && apt-get install -y cmake protobuf-compiler
 WORKDIR /build
 COPY . /build/
 RUN cargo build --release --workspace
diff --git a/src/common/mod.rs b/src/common/mod.rs
index e602fce..7ab1b23 100644
--- a/src/common/mod.rs
+++ b/src/common/mod.rs
@@ -18,3 +18,4 @@
 
 pub mod random_generator;
 pub(crate) mod system_time;
+pub(crate) mod wait_group;
diff --git a/src/common/wait_group.rs b/src/common/wait_group.rs
new file mode 100644
index 0000000..8d35b9a
--- /dev/null
+++ b/src/common/wait_group.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use std::sync::{Arc, Condvar, Mutex};
+
+#[derive(Clone)]
+pub(crate) struct WaitGroup {
+    inner: Arc<Inner>,
+}
+
+struct Inner {
+    var: Condvar,
+    count: Mutex<usize>,
+}
+
+impl Default for WaitGroup {
+    fn default() -> Self {
+        Self {
+            inner: Arc::new(Inner {
+                var: Condvar::new(),
+                count: Mutex::new(0),
+            }),
+        }
+    }
+}
+
+impl WaitGroup {
+    pub(crate) fn add(&self, n: usize) {
+        *self.inner.count.lock().unwrap() += n;
+    }
+
+    pub(crate) fn done(&self) {
+        let mut count = self.inner.count.lock().unwrap();
+        *count -= 1;
+        if *count == 0 {
+            self.inner.var.notify_all();
+        }
+    }
+
+    pub(crate) fn wait(self) {
+        let mut count = self.inner.count.lock().unwrap();
+        while *count > 0 {
+            count = self.inner.var.wait(count).unwrap();
+        }
+    }
+}
diff --git a/src/logging/record.rs b/src/logging/record.rs
index 53fbc13..fd45e79 100644
--- a/src/logging/record.rs
+++ b/src/logging/record.rs
@@ -22,7 +22,10 @@ use crate::{
         log_data_body::Content, JsonLog, KeyStringValuePair, LogData, 
LogDataBody, LogTags,
         TextLog, TraceContext, YamlLog,
     },
-    trace::{span::Span, trace_context::TracingContext},
+    trace::{
+        span::{AbstractSpan, Span},
+        trace_context::TracingContext,
+    },
 };
 use std::time::{SystemTime, UNIX_EPOCH};
 
diff --git a/src/trace/span.rs b/src/trace/span.rs
index 8babae4..53e6ce0 100644
--- a/src/trace/span.rs
+++ b/src/trace/span.rs
@@ -18,11 +18,47 @@
 //! Span from Google Dapper Paper.
 
 use crate::{
-    common::system_time::{fetch_time, TimePeriod},
+    common::{
+        system_time::{fetch_time, TimePeriod},
+        wait_group::WaitGroup,
+    },
     proto::v3::{SpanLayer, SpanObject, SpanType},
-    trace::trace_context::SpanStack,
+    trace::trace_context::{SpanStack, SpanUid},
 };
-use std::{fmt::Formatter, mem::take, sync::Arc};
+use std::{
+    fmt::{self, Formatter},
+    mem::take,
+    sync::{Arc, Weak},
+};
+
+/// [AbstractSpan] contains methods handle [SpanObject].
+pub trait AbstractSpan {
+    /// Get immutable span object reference.
+    fn span_object(&self) -> &SpanObject;
+
+    /// Mutable with inner span object.
+    fn span_object_mut(&mut self) -> &mut SpanObject;
+
+    /// Get span id.
+    fn span_id(&self) -> i32 {
+        self.span_object().span_id
+    }
+
+    /// Add logs to the span.
+    fn add_log<K, V, I>(&mut self, message: I)
+    where
+        K: Into<String>,
+        V: Into<String>,
+        I: IntoIterator<Item = (K, V)>,
+    {
+        self.span_object_mut().add_log(message)
+    }
+
+    /// Add tag to the span.
+    fn add_tag(&mut self, key: impl Into<String>, value: impl Into<String>) {
+        self.span_object_mut().add_tag(key, value)
+    }
+}
 
 /// Span is a concept that represents trace information for a single RPC.
 /// The Rust SDK supports Entry Span to represent inbound to a service
@@ -61,13 +97,14 @@ use std::{fmt::Formatter, mem::take, sync::Arc};
 /// ```
 #[must_use = "assign a variable name to guard the span not be dropped 
immediately."]
 pub struct Span {
-    index: usize,
+    uid: SpanUid,
     obj: Option<SpanObject>,
+    wg: WaitGroup,
     stack: Arc<SpanStack>,
 }
 
-impl std::fmt::Debug for Span {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+impl fmt::Debug for Span {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
         f.debug_struct("Span")
             .field(
                 "data",
@@ -83,10 +120,11 @@ impl std::fmt::Debug for Span {
 const SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;
 
 impl Span {
-    pub(crate) fn new(index: usize, obj: SpanObject, stack: Arc<SpanStack>) -> 
Self {
+    pub(crate) fn new(uid: SpanUid, obj: SpanObject, wg: WaitGroup, stack: 
Arc<SpanStack>) -> Self {
         Self {
-            index,
+            uid,
             obj: Some(obj),
+            wg,
             stack,
         }
     }
@@ -115,45 +153,111 @@ impl Span {
         }
     }
 
-    /// Get immutable span object reference.
+    fn is_active_span(&self) -> bool {
+        let active_spans = &*self.stack.active();
+        active_spans
+            .last()
+            .map(|span| span.uid() == self.uid)
+            .unwrap_or_default()
+    }
+
+    /// The [Span] finish at current tracing context, but the current span is
+    /// still alive, until [AsyncSpan] dropped.
+    ///
+    /// This method must be called:
+    ///
+    /// 1. In original thread (tracing context).
+    /// 2. Current span is active span.
+    ///
+    /// During alive, tags, logs and attributes of the span could be changed, 
in
+    /// any thread.
+    ///
+    /// # Panics
+    ///
+    /// Current span could by active span.
+    pub fn prepare_for_async(mut self) -> AsyncSpan {
+        if !self.is_active_span() {
+            panic!("current span isn't active span");
+        }
+
+        self.wg.add(1);
+
+        AsyncSpan {
+            uid: self.uid,
+            wg: self.wg.clone(),
+            obj: take(&mut self.obj),
+            stack: Arc::downgrade(&self.stack),
+        }
+    }
+}
+
+impl Drop for Span {
+    /// Set the end time as current time, pop from context active span stack,
+    /// and push to context spans.
+    fn drop(&mut self) {
+        self.stack.finalize_span(self.uid, take(&mut self.obj));
+    }
+}
+
+impl AbstractSpan for Span {
     #[inline]
-    pub fn span_object(&self) -> &SpanObject {
+    fn span_object(&self) -> &SpanObject {
         self.obj.as_ref().unwrap()
     }
 
-    /// Mutable with inner span object.
     #[inline]
-    pub fn span_object_mut(&mut self) -> &mut SpanObject {
+    fn span_object_mut(&mut self) -> &mut SpanObject {
         self.obj.as_mut().unwrap()
     }
+}
 
-    /// Get span id.
-    pub fn span_id(&self) -> i32 {
-        self.span_object().span_id
-    }
-
-    /// Add logs to the span.
-    pub fn add_log<K, V, I>(&mut self, message: I)
-    where
-        K: Into<String>,
-        V: Into<String>,
-        I: IntoIterator<Item = (K, V)>,
-    {
-        self.span_object_mut().add_log(message)
-    }
+/// Generated by [Span::prepare_for_async], tags, logs and attributes of the
+/// span could be changed, in any thread.
+///
+/// It could be finished when dropped.
+#[must_use = "assign a variable name to guard the active span not be dropped 
immediately."]
+pub struct AsyncSpan {
+    uid: SpanUid,
+    obj: Option<SpanObject>,
+    wg: WaitGroup,
+    stack: Weak<SpanStack>,
+}
 
-    /// Add tag to the span.
-    pub fn add_tag(&mut self, key: impl Into<String>, value: impl 
Into<String>) {
-        self.span_object_mut().add_tag(key, value)
+impl fmt::Debug for AsyncSpan {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        f.debug_struct("AsyncSpan")
+            .field(
+                "data",
+                match self.obj {
+                    Some(ref obj) => obj,
+                    None => &"<none>",
+                },
+            )
+            .finish()
     }
 }
 
-impl Drop for Span {
-    /// Set the end time as current time, pop from context active span stack,
-    /// and push to context spans.
+impl Drop for AsyncSpan {
+    /// Set the end time as current time.
     fn drop(&mut self) {
         self.stack
-            .finalize_span(self.index, take(&mut self.obj).unwrap());
+            .upgrade()
+            .expect("TracingContext has dropped")
+            .finalize_async_span(self.uid, take(&mut self.obj).unwrap());
+
+        self.wg.done();
+    }
+}
+
+impl AbstractSpan for AsyncSpan {
+    #[inline]
+    fn span_object(&self) -> &SpanObject {
+        self.obj.as_ref().unwrap()
+    }
+
+    #[inline]
+    fn span_object_mut(&mut self) -> &mut SpanObject {
+        self.obj.as_mut().unwrap()
     }
 }
 
@@ -161,7 +265,9 @@ impl Drop for Span {
 mod tests {
     use super::*;
 
-    trait AssertSend: Send {}
+    trait AssertSend: Send + 'static {}
 
     impl AssertSend for Span {}
+
+    impl AssertSend for AsyncSpan {}
 }
diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs
index 4613b77..ca57a3b 100644
--- a/src/trace/trace_context.rs
+++ b/src/trace/trace_context.rs
@@ -22,49 +22,83 @@ use crate::{
     common::{
         random_generator::RandomGenerator,
         system_time::{fetch_time, TimePeriod},
+        wait_group::WaitGroup,
     },
     error::LOCK_MSG,
     proto::v3::{RefType, SegmentObject, SegmentReference, SpanLayer, 
SpanObject, SpanType},
     trace::{
         propagation::context::PropagationContext,
-        span::Span,
+        span::{AbstractSpan, Span},
         tracer::{Tracer, WeakTracer},
     },
 };
 use parking_lot::{
     MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard, 
RwLockWriteGuard,
 };
-use std::{fmt::Formatter, mem::take, sync::Arc};
+use std::{
+    fmt::Formatter,
+    mem::take,
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+};
+
+/// The span uid is to identify the [Span] for crate.
+pub(crate) type SpanUid = usize;
 
 pub(crate) struct ActiveSpan {
+    uid: SpanUid,
     span_id: i32,
-
     /// For [TracingContext::continued] used.
     r#ref: Option<SegmentReference>,
 }
 
 impl ActiveSpan {
-    fn new(span_id: i32) -> Self {
+    fn new(uid: SpanUid, span_id: i32) -> Self {
         Self {
+            uid,
             span_id,
             r#ref: None,
         }
     }
+
+    #[inline]
+    pub(crate) fn uid(&self) -> SpanUid {
+        self.uid
+    }
+}
+
+pub(crate) struct FinalizeSpan {
+    uid: SpanUid,
+    /// When the span is [AsyncSpan] and unfinished, it is None.
+    obj: Option<SpanObject>,
+    /// For [TracingContext::continued] used.
+    r#ref: Option<SegmentReference>,
+}
+
+impl FinalizeSpan {
+    pub(crate) fn new(
+        uid: usize,
+        obj: Option<SpanObject>,
+        r#ref: Option<SegmentReference>,
+    ) -> Self {
+        Self { uid, obj, r#ref }
+    }
 }
 
 #[derive(Default)]
 pub(crate) struct SpanStack {
-    pub(crate) finalized: RwLock<Vec<SpanObject>>,
+    pub(crate) finalized: RwLock<Vec<FinalizeSpan>>,
     pub(crate) active: RwLock<Vec<ActiveSpan>>,
 }
 
 impl SpanStack {
-    #[allow(dead_code)]
-    pub(crate) fn finalized(&self) -> RwLockReadGuard<'_, Vec<SpanObject>> {
+    pub(crate) fn finalized(&self) -> RwLockReadGuard<'_, Vec<FinalizeSpan>> {
         self.finalized.try_read().expect(LOCK_MSG)
     }
 
-    pub(crate) fn finalized_mut(&self) -> RwLockWriteGuard<'_, 
Vec<SpanObject>> {
+    pub(crate) fn finalized_mut(&self) -> RwLockWriteGuard<'_, 
Vec<FinalizeSpan>> {
         self.finalized.try_write().expect(LOCK_MSG)
     }
 
@@ -76,31 +110,53 @@ impl SpanStack {
         self.active.try_write().expect(LOCK_MSG)
     }
 
-    fn pop_active(&self, index: usize) -> Option<ActiveSpan> {
+    fn pop_active(&self, uid: SpanUid) -> Option<ActiveSpan> {
         let mut stack = self.active_mut();
-        if stack.len() > index + 1 {
-            None
-        } else {
+        if stack
+            .last()
+            .map(|span| span.uid() == uid)
+            .unwrap_or_default()
+        {
             stack.pop()
+        } else {
+            None
         }
     }
 
     /// Close span. We can't use closed span after finalize called.
-    pub(crate) fn finalize_span(&self, index: usize, mut obj: SpanObject) {
-        let span = self.pop_active(index);
-        if let Some(span) = span {
-            if span.span_id == obj.span_id {
-                obj.end_time = fetch_time(TimePeriod::End);
+    pub(crate) fn finalize_span(&self, uid: SpanUid, obj: Option<SpanObject>) {
+        let Some(active_span) = self.pop_active(uid) else {
+            panic!("Finalize span isn't the active span");
+        };
 
-                if let Some(r#ref) = span.r#ref {
+        let finalize_span = match obj {
+            Some(mut obj) => {
+                obj.end_time = fetch_time(TimePeriod::End);
+                if let Some(r#ref) = active_span.r#ref {
                     obj.refs.push(r#ref);
                 }
+                FinalizeSpan::new(uid, Some(obj), None)
+            }
+            None => FinalizeSpan::new(uid, None, active_span.r#ref),
+        };
+
+        self.finalized_mut().push(finalize_span);
+    }
 
-                self.finalized_mut().push(obj);
+    /// Close async span, fill the span object.
+    pub(crate) fn finalize_async_span(&self, uid: SpanUid, mut obj: 
SpanObject) {
+        for finalize_span in &mut *self.finalized_mut() {
+            if finalize_span.uid == uid {
+                obj.end_time = fetch_time(TimePeriod::End);
+                if let Some(r#ref) = take(&mut finalize_span.r#ref) {
+                    obj.refs.push(r#ref);
+                }
+                finalize_span.obj = Some(obj);
                 return;
             }
         }
-        panic!("Finalize span isn't the active span");
+
+        unreachable!()
     }
 }
 
@@ -116,28 +172,19 @@ pub struct TracingContext {
     next_span_id: i32,
     span_stack: Arc<SpanStack>,
     primary_endpoint_name: String,
+    span_uid_generator: AtomicUsize,
+    wg: WaitGroup,
     tracer: WeakTracer,
 }
 
 impl std::fmt::Debug for TracingContext {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let span_objects: Vec<SpanObject>;
         f.debug_struct("TracingContext")
             .field("trace_id", &self.trace_id)
             .field("trace_segment_id", &self.trace_segment_id)
             .field("service", &self.service)
             .field("service_instance", &self.service_instance)
             .field("next_span_id", &self.next_span_id)
-            .field(
-                "finalized_spans",
-                match self.span_stack.finalized.try_read() {
-                    Some(spans) => {
-                        span_objects = spans.clone();
-                        &span_objects
-                    }
-                    None => &"<locked>",
-                },
-            )
             .finish()
     }
 }
@@ -157,6 +204,8 @@ impl TracingContext {
             next_span_id: Default::default(),
             span_stack: Default::default(),
             primary_endpoint_name: Default::default(),
+            span_uid_generator: AtomicUsize::new(0),
+            wg: Default::default(),
             tracer,
         }
     }
@@ -196,16 +245,19 @@ impl TracingContext {
         span_id
     }
 
+    /// The span uid is to identify the [Span] for crate.
+    fn generate_span_uid(&self) -> SpanUid {
+        self.span_uid_generator.fetch_add(1, Ordering::SeqCst)
+    }
+
     /// Clone the last finalized span.
     #[doc(hidden)]
     pub fn last_span(&self) -> Option<SpanObject> {
-        RwLockReadGuard::try_map(self.span_stack.finalized(), |spans| 
spans.last())
-            .ok()
-            .as_deref()
-            .cloned()
+        let spans = &*self.span_stack.finalized();
+        spans.iter().rev().find_map(|span| span.obj.clone())
     }
 
-    fn spans_mut(&mut self) -> RwLockWriteGuard<'_, Vec<SpanObject>> {
+    fn finalize_spans_mut(&mut self) -> RwLockWriteGuard<'_, 
Vec<FinalizeSpan>> {
         self.span_stack.finalized.try_write().expect(LOCK_MSG)
     }
 
@@ -243,7 +295,7 @@ impl TracingContext {
         );
 
         let index = self.push_active_span(&span);
-        Span::new(index, span, self.span_stack.clone())
+        Span::new(index, span, self.wg.clone(), self.span_stack.clone())
     }
 
     /// Create a new entry span, which is an initiator of collection of spans.
@@ -275,28 +327,20 @@ impl TracingContext {
 
     /// Create a new exit span, which will be created when tracing context will
     /// generate new span for function invocation.
+    ///
     /// Currently, this SDK supports RPC call. So we must set `remote_peer`.
     ///
     /// # Panics
     ///
     /// Panic if entry span not existed.
+    #[inline]
     pub fn create_exit_span(&mut self, operation_name: &str, remote_peer: 
&str) -> Span {
-        if self.next_span_id() == 0 {
-            panic!("entry span must be existed.");
-        }
-
-        let span = Span::new_obj(
-            self.inc_next_span_id(),
-            self.peek_active_span_id().unwrap_or(-1),
-            operation_name.to_string(),
-            remote_peer.to_string(),
+        self.create_common_span(
+            operation_name,
+            remote_peer,
             SpanType::Exit,
-            SpanLayer::Http,
-            false,
-        );
-
-        let index = self.push_active_span(&span);
-        Span::new(index, span, self.span_stack.clone())
+            self.peek_active_span_id().unwrap_or(-1),
+        )
     }
 
     /// Create a new local span.
@@ -304,23 +348,40 @@ impl TracingContext {
     /// # Panics
     ///
     /// Panic if entry span not existed.
+    #[inline]
     pub fn create_local_span(&mut self, operation_name: &str) -> Span {
+        self.create_common_span(
+            operation_name,
+            "",
+            SpanType::Local,
+            self.peek_active_span_id().unwrap_or(-1),
+        )
+    }
+
+    /// create exit or local span common logic.
+    fn create_common_span(
+        &mut self,
+        operation_name: &str,
+        remote_peer: &str,
+        span_type: SpanType,
+        parent_span_id: i32,
+    ) -> Span {
         if self.next_span_id() == 0 {
             panic!("entry span must be existed.");
         }
 
         let span = Span::new_obj(
             self.inc_next_span_id(),
-            self.peek_active_span_id().unwrap_or(-1),
+            parent_span_id,
             operation_name.to_string(),
-            Default::default(),
-            SpanType::Local,
+            remote_peer.to_string(),
+            span_type,
             SpanLayer::Unknown,
             false,
         );
 
-        let index = self.push_active_span(&span);
-        Span::new(index, span, self.span_stack.clone())
+        let uid = self.push_active_span(&span);
+        Span::new(uid, span, self.wg.clone(), self.span_stack.clone())
     }
 
     /// Capture a snapshot for cross-thread propagation.
@@ -357,6 +418,11 @@ impl TracingContext {
         }
     }
 
+    /// Wait all async span dropped which, created by 
[Span::prepare_for_async].
+    pub fn wait(self) {
+        self.wg.clone().wait();
+    }
+
     /// It converts tracing context into segment object.
     /// This conversion should be done before sending segments into OAP.
     ///
@@ -367,7 +433,12 @@ impl TracingContext {
         let trace_segment_id = self.trace_segment_id().to_owned();
         let service = self.service().to_owned();
         let service_instance = self.service_instance().to_owned();
-        let spans = take(&mut *self.spans_mut());
+        let spans = take(&mut *self.finalize_spans_mut());
+
+        let spans = spans
+            .into_iter()
+            .map(|span| span.obj.expect("Some async span haven't finished"))
+            .collect();
 
         SegmentObject {
             trace_id,
@@ -383,11 +454,14 @@ impl TracingContext {
         self.active_span().map(|span| span.span_id)
     }
 
-    fn push_active_span(&mut self, span: &SpanObject) -> usize {
+    fn push_active_span(&mut self, span: &SpanObject) -> SpanUid {
+        let uid = self.generate_span_uid();
+
         self.primary_endpoint_name = span.operation_name.clone();
         let mut stack = self.active_span_stack_mut();
-        stack.push(ActiveSpan::new(span.span_id));
-        stack.len() - 1
+        stack.push(ActiveSpan::new(uid, span.span_id));
+
+        uid
     }
 
     fn upgrade_tracer(&self) -> Tracer {
diff --git a/tests/logging.rs b/tests/logging.rs
index e3f9d36..f2de2fe 100644
--- a/tests/logging.rs
+++ b/tests/logging.rs
@@ -24,7 +24,7 @@ use skywalking::{
         TextLog, TraceContext,
     },
     reporter::{CollectItem, Report},
-    trace::tracer::Tracer,
+    trace::{span::AbstractSpan, tracer::Tracer},
 };
 use std::{
     collections::LinkedList,
diff --git a/tests/trace_context.rs b/tests/trace_context.rs
index 6501922..458c06e 100644
--- a/tests/trace_context.rs
+++ b/tests/trace_context.rs
@@ -23,6 +23,7 @@ use skywalking::{
     reporter::{print::PrintReporter, CollectItem, Report},
     trace::{
         propagation::{decoder::decode_propagation, 
encoder::encode_propagation},
+        span::AbstractSpan,
         tracer::Tracer,
     },
 };
@@ -107,7 +108,7 @@ async fn create_span() {
                         operation_name: "op3".to_string(),
                         peer: "example.com/test".to_string(),
                         span_type: SpanType::Exit as i32,
-                        span_layer: SpanLayer::Http as i32,
+                        span_layer: SpanLayer::Unknown as i32,
                         component_id: 11000,
                         is_error: false,
                         tags: Vec::<KeyStringValuePair>::new(),
@@ -133,7 +134,7 @@ async fn create_span() {
                             operation_name: "op5".to_string(),
                             peer: "example.com/test".to_string(),
                             span_type: SpanType::Exit as i32,
-                            span_layer: SpanLayer::Http as i32,
+                            span_layer: SpanLayer::Unknown as i32,
                             component_id: 11000,
                             is_error: false,
                             tags: Vec::<KeyStringValuePair>::new(),
@@ -144,6 +145,54 @@ async fn create_span() {
                     }
                 }
 
+                {
+                    let span6 = context.create_local_span("op6");
+
+                    {
+                        let span7 = context.create_local_span("op7");
+                        let span7 = span7.prepare_for_async();
+                        
assert_eq!(context.last_span().unwrap().operation_name, "op4");
+                        assert_eq!(
+                            span7.span_object().parent_span_id,
+                            span6.span_object().span_id,
+                        );
+
+                        let span8 = context.create_exit_span("op7", 
"example.com/test");
+                        let mut span8 = span8.prepare_for_async();
+                        
assert_eq!(context.last_span().unwrap().operation_name, "op4");
+                        assert_eq!(
+                            span8.span_object().parent_span_id,
+                            span6.span_object().span_id,
+                        );
+
+                        {
+                            let _ = span7;
+                            span8.add_tag("foo", "bar");
+                        }
+                    }
+
+                    let span8_expected = SpanObject {
+                        span_id: 7,
+                        parent_span_id: 5,
+                        start_time: 1,
+                        end_time: 100,
+                        refs: Vec::<SegmentReference>::new(),
+                        operation_name: "op7".to_string(),
+                        peer: "example.com/test".to_string(),
+                        span_type: SpanType::Exit as i32,
+                        span_layer: SpanLayer::Unknown as i32,
+                        component_id: 11000,
+                        is_error: false,
+                        tags: vec![KeyStringValuePair {
+                            key: "foo".to_string(),
+                            value: "bar".to_string(),
+                        }],
+                        logs: Vec::<Log>::new(),
+                        skip_analysis: false,
+                    };
+                    assert_eq!(context.last_span(), Some(span8_expected));
+                }
+
                 drop(span1);
 
                 let span1_expected = SpanObject {
@@ -165,6 +214,8 @@ async fn create_span() {
                 assert_eq!(context.last_span(), Some(span1_expected));
             }
 
+            context.wait();
+
             tracer
         },
         |segment| {

Reply via email to