This is an automated email from the ASF dual-hosted git repository.
jmjoy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git
The following commit(s) were added to refs/heads/master by this push:
new 22aa373 Add `Span::prepare_for_async` method and `AbstractSpan`
trait. (#58)
22aa373 is described below
commit 22aa373de5c5fe294ee2b086790a86a02b43772e
Author: jmjoy <[email protected]>
AuthorDate: Tue May 23 18:25:00 2023 +0800
Add `Span::prepare_for_async` method and `AbstractSpan` trait. (#58)
---
README.md | 42 +++++++++
e2e/data/expected_context.yaml | 2 +-
e2e/docker/Dockerfile | 4 +-
src/common/mod.rs | 1 +
src/common/wait_group.rs | 59 +++++++++++++
src/logging/record.rs | 5 +-
src/trace/span.rs | 174 +++++++++++++++++++++++++++++-------
src/trace/trace_context.rs | 196 ++++++++++++++++++++++++++++-------------
tests/logging.rs | 2 +-
tests/trace_context.rs | 55 +++++++++++-
10 files changed, 438 insertions(+), 102 deletions(-)
diff --git a/README.md b/README.md
index 3ebf868..715001e 100644
--- a/README.md
+++ b/README.md
@@ -170,6 +170,48 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
```
+# Advanced APIs
+
+## Async Span APIs
+
+`Span::prepare_for_async` designed for async use cases.
+When tags, logs, and attributes (including end time) of the span need to be
set in another
+thread or coroutine.
+
+`TracingContext::wait` wait for all `AsyncSpan` finished.
+
+```rust
+use skywalking::{
+ trace::tracer::Tracer,
+ trace::span::AbstractSpan,
+};
+
+async fn handle(tracer: Tracer) {
+ let mut ctx = tracer.create_trace_context();
+
+ {
+ let span = ctx.create_entry_span("op1");
+
+ // Create AsyncSpan and drop span.
+ // Internally, span will occupy the position of finalized span stack.
+ let mut async_span = span.prepare_for_async();
+
+ // Start async route, catch async_span with `move` keyword.
+ tokio::spawn(async move {
+
+ async_span.add_tag("foo", "bar");
+
+ // Something...
+
+ // async_span will drop here, submit modifications to finalized
spans stack.
+ });
+ }
+
+ // Wait for all `AsyncSpan` finished.
+ ctx.wait();
+}
+```
+
# How to compile?
If you have `skywalking-(VERSION).crate`, you can unpack it with the way as
follows:
diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml
index 6551e9e..ca53043 100644
--- a/e2e/data/expected_context.yaml
+++ b/e2e/data/expected_context.yaml
@@ -50,7 +50,7 @@ segmentItems:
peer: consumer:8082
skipAnalysis: false
spanId: 1
- spanLayer: Http
+ spanLayer: Unknown
spanType: Exit
startTime: gt 0
- componentId: 11000
diff --git a/e2e/docker/Dockerfile b/e2e/docker/Dockerfile
index 5b55d32..94ab5c3 100644
--- a/e2e/docker/Dockerfile
+++ b/e2e/docker/Dockerfile
@@ -15,8 +15,8 @@
# specific language governing permissions and limitations
# under the License.
#
-FROM rust:1.63
-RUN apt-get update && apt-get install -y cmake protobuf-compiler=3.12.4-1
+FROM rust:1.65
+RUN apt-get update && apt-get install -y cmake protobuf-compiler
WORKDIR /build
COPY . /build/
RUN cargo build --release --workspace
diff --git a/src/common/mod.rs b/src/common/mod.rs
index e602fce..7ab1b23 100644
--- a/src/common/mod.rs
+++ b/src/common/mod.rs
@@ -18,3 +18,4 @@
pub mod random_generator;
pub(crate) mod system_time;
+pub(crate) mod wait_group;
diff --git a/src/common/wait_group.rs b/src/common/wait_group.rs
new file mode 100644
index 0000000..8d35b9a
--- /dev/null
+++ b/src/common/wait_group.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use std::sync::{Arc, Condvar, Mutex};
+
+#[derive(Clone)]
+pub(crate) struct WaitGroup {
+ inner: Arc<Inner>,
+}
+
+struct Inner {
+ var: Condvar,
+ count: Mutex<usize>,
+}
+
+impl Default for WaitGroup {
+ fn default() -> Self {
+ Self {
+ inner: Arc::new(Inner {
+ var: Condvar::new(),
+ count: Mutex::new(0),
+ }),
+ }
+ }
+}
+
+impl WaitGroup {
+ pub(crate) fn add(&self, n: usize) {
+ *self.inner.count.lock().unwrap() += n;
+ }
+
+ pub(crate) fn done(&self) {
+ let mut count = self.inner.count.lock().unwrap();
+ *count -= 1;
+ if *count == 0 {
+ self.inner.var.notify_all();
+ }
+ }
+
+ pub(crate) fn wait(self) {
+ let mut count = self.inner.count.lock().unwrap();
+ while *count > 0 {
+ count = self.inner.var.wait(count).unwrap();
+ }
+ }
+}
diff --git a/src/logging/record.rs b/src/logging/record.rs
index 53fbc13..fd45e79 100644
--- a/src/logging/record.rs
+++ b/src/logging/record.rs
@@ -22,7 +22,10 @@ use crate::{
log_data_body::Content, JsonLog, KeyStringValuePair, LogData,
LogDataBody, LogTags,
TextLog, TraceContext, YamlLog,
},
- trace::{span::Span, trace_context::TracingContext},
+ trace::{
+ span::{AbstractSpan, Span},
+ trace_context::TracingContext,
+ },
};
use std::time::{SystemTime, UNIX_EPOCH};
diff --git a/src/trace/span.rs b/src/trace/span.rs
index 8babae4..53e6ce0 100644
--- a/src/trace/span.rs
+++ b/src/trace/span.rs
@@ -18,11 +18,47 @@
//! Span from Google Dapper Paper.
use crate::{
- common::system_time::{fetch_time, TimePeriod},
+ common::{
+ system_time::{fetch_time, TimePeriod},
+ wait_group::WaitGroup,
+ },
proto::v3::{SpanLayer, SpanObject, SpanType},
- trace::trace_context::SpanStack,
+ trace::trace_context::{SpanStack, SpanUid},
};
-use std::{fmt::Formatter, mem::take, sync::Arc};
+use std::{
+ fmt::{self, Formatter},
+ mem::take,
+ sync::{Arc, Weak},
+};
+
+/// [AbstractSpan] contains methods handle [SpanObject].
+pub trait AbstractSpan {
+ /// Get immutable span object reference.
+ fn span_object(&self) -> &SpanObject;
+
+ /// Mutable with inner span object.
+ fn span_object_mut(&mut self) -> &mut SpanObject;
+
+ /// Get span id.
+ fn span_id(&self) -> i32 {
+ self.span_object().span_id
+ }
+
+ /// Add logs to the span.
+ fn add_log<K, V, I>(&mut self, message: I)
+ where
+ K: Into<String>,
+ V: Into<String>,
+ I: IntoIterator<Item = (K, V)>,
+ {
+ self.span_object_mut().add_log(message)
+ }
+
+ /// Add tag to the span.
+ fn add_tag(&mut self, key: impl Into<String>, value: impl Into<String>) {
+ self.span_object_mut().add_tag(key, value)
+ }
+}
/// Span is a concept that represents trace information for a single RPC.
/// The Rust SDK supports Entry Span to represent inbound to a service
@@ -61,13 +97,14 @@ use std::{fmt::Formatter, mem::take, sync::Arc};
/// ```
#[must_use = "assign a variable name to guard the span not be dropped
immediately."]
pub struct Span {
- index: usize,
+ uid: SpanUid,
obj: Option<SpanObject>,
+ wg: WaitGroup,
stack: Arc<SpanStack>,
}
-impl std::fmt::Debug for Span {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+impl fmt::Debug for Span {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Span")
.field(
"data",
@@ -83,10 +120,11 @@ impl std::fmt::Debug for Span {
const SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;
impl Span {
- pub(crate) fn new(index: usize, obj: SpanObject, stack: Arc<SpanStack>) ->
Self {
+ pub(crate) fn new(uid: SpanUid, obj: SpanObject, wg: WaitGroup, stack:
Arc<SpanStack>) -> Self {
Self {
- index,
+ uid,
obj: Some(obj),
+ wg,
stack,
}
}
@@ -115,45 +153,111 @@ impl Span {
}
}
- /// Get immutable span object reference.
+ fn is_active_span(&self) -> bool {
+ let active_spans = &*self.stack.active();
+ active_spans
+ .last()
+ .map(|span| span.uid() == self.uid)
+ .unwrap_or_default()
+ }
+
+ /// The [Span] finish at current tracing context, but the current span is
+ /// still alive, until [AsyncSpan] dropped.
+ ///
+ /// This method must be called:
+ ///
+ /// 1. In original thread (tracing context).
+ /// 2. Current span is active span.
+ ///
+ /// During alive, tags, logs and attributes of the span could be changed,
in
+ /// any thread.
+ ///
+ /// # Panics
+ ///
+ /// Current span could by active span.
+ pub fn prepare_for_async(mut self) -> AsyncSpan {
+ if !self.is_active_span() {
+ panic!("current span isn't active span");
+ }
+
+ self.wg.add(1);
+
+ AsyncSpan {
+ uid: self.uid,
+ wg: self.wg.clone(),
+ obj: take(&mut self.obj),
+ stack: Arc::downgrade(&self.stack),
+ }
+ }
+}
+
+impl Drop for Span {
+ /// Set the end time as current time, pop from context active span stack,
+ /// and push to context spans.
+ fn drop(&mut self) {
+ self.stack.finalize_span(self.uid, take(&mut self.obj));
+ }
+}
+
+impl AbstractSpan for Span {
#[inline]
- pub fn span_object(&self) -> &SpanObject {
+ fn span_object(&self) -> &SpanObject {
self.obj.as_ref().unwrap()
}
- /// Mutable with inner span object.
#[inline]
- pub fn span_object_mut(&mut self) -> &mut SpanObject {
+ fn span_object_mut(&mut self) -> &mut SpanObject {
self.obj.as_mut().unwrap()
}
+}
- /// Get span id.
- pub fn span_id(&self) -> i32 {
- self.span_object().span_id
- }
-
- /// Add logs to the span.
- pub fn add_log<K, V, I>(&mut self, message: I)
- where
- K: Into<String>,
- V: Into<String>,
- I: IntoIterator<Item = (K, V)>,
- {
- self.span_object_mut().add_log(message)
- }
+/// Generated by [Span::prepare_for_async], tags, logs and attributes of the
+/// span could be changed, in any thread.
+///
+/// It could be finished when dropped.
+#[must_use = "assign a variable name to guard the active span not be dropped
immediately."]
+pub struct AsyncSpan {
+ uid: SpanUid,
+ obj: Option<SpanObject>,
+ wg: WaitGroup,
+ stack: Weak<SpanStack>,
+}
- /// Add tag to the span.
- pub fn add_tag(&mut self, key: impl Into<String>, value: impl
Into<String>) {
- self.span_object_mut().add_tag(key, value)
+impl fmt::Debug for AsyncSpan {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AsyncSpan")
+ .field(
+ "data",
+ match self.obj {
+ Some(ref obj) => obj,
+ None => &"<none>",
+ },
+ )
+ .finish()
}
}
-impl Drop for Span {
- /// Set the end time as current time, pop from context active span stack,
- /// and push to context spans.
+impl Drop for AsyncSpan {
+ /// Set the end time as current time.
fn drop(&mut self) {
self.stack
- .finalize_span(self.index, take(&mut self.obj).unwrap());
+ .upgrade()
+ .expect("TracingContext has dropped")
+ .finalize_async_span(self.uid, take(&mut self.obj).unwrap());
+
+ self.wg.done();
+ }
+}
+
+impl AbstractSpan for AsyncSpan {
+ #[inline]
+ fn span_object(&self) -> &SpanObject {
+ self.obj.as_ref().unwrap()
+ }
+
+ #[inline]
+ fn span_object_mut(&mut self) -> &mut SpanObject {
+ self.obj.as_mut().unwrap()
}
}
@@ -161,7 +265,9 @@ impl Drop for Span {
mod tests {
use super::*;
- trait AssertSend: Send {}
+ trait AssertSend: Send + 'static {}
impl AssertSend for Span {}
+
+ impl AssertSend for AsyncSpan {}
}
diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs
index 4613b77..ca57a3b 100644
--- a/src/trace/trace_context.rs
+++ b/src/trace/trace_context.rs
@@ -22,49 +22,83 @@ use crate::{
common::{
random_generator::RandomGenerator,
system_time::{fetch_time, TimePeriod},
+ wait_group::WaitGroup,
},
error::LOCK_MSG,
proto::v3::{RefType, SegmentObject, SegmentReference, SpanLayer,
SpanObject, SpanType},
trace::{
propagation::context::PropagationContext,
- span::Span,
+ span::{AbstractSpan, Span},
tracer::{Tracer, WeakTracer},
},
};
use parking_lot::{
MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard,
RwLockWriteGuard,
};
-use std::{fmt::Formatter, mem::take, sync::Arc};
+use std::{
+ fmt::Formatter,
+ mem::take,
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+ },
+};
+
+/// The span uid is to identify the [Span] for crate.
+pub(crate) type SpanUid = usize;
pub(crate) struct ActiveSpan {
+ uid: SpanUid,
span_id: i32,
-
/// For [TracingContext::continued] used.
r#ref: Option<SegmentReference>,
}
impl ActiveSpan {
- fn new(span_id: i32) -> Self {
+ fn new(uid: SpanUid, span_id: i32) -> Self {
Self {
+ uid,
span_id,
r#ref: None,
}
}
+
+ #[inline]
+ pub(crate) fn uid(&self) -> SpanUid {
+ self.uid
+ }
+}
+
+pub(crate) struct FinalizeSpan {
+ uid: SpanUid,
+ /// When the span is [AsyncSpan] and unfinished, it is None.
+ obj: Option<SpanObject>,
+ /// For [TracingContext::continued] used.
+ r#ref: Option<SegmentReference>,
+}
+
+impl FinalizeSpan {
+ pub(crate) fn new(
+ uid: usize,
+ obj: Option<SpanObject>,
+ r#ref: Option<SegmentReference>,
+ ) -> Self {
+ Self { uid, obj, r#ref }
+ }
}
#[derive(Default)]
pub(crate) struct SpanStack {
- pub(crate) finalized: RwLock<Vec<SpanObject>>,
+ pub(crate) finalized: RwLock<Vec<FinalizeSpan>>,
pub(crate) active: RwLock<Vec<ActiveSpan>>,
}
impl SpanStack {
- #[allow(dead_code)]
- pub(crate) fn finalized(&self) -> RwLockReadGuard<'_, Vec<SpanObject>> {
+ pub(crate) fn finalized(&self) -> RwLockReadGuard<'_, Vec<FinalizeSpan>> {
self.finalized.try_read().expect(LOCK_MSG)
}
- pub(crate) fn finalized_mut(&self) -> RwLockWriteGuard<'_,
Vec<SpanObject>> {
+ pub(crate) fn finalized_mut(&self) -> RwLockWriteGuard<'_,
Vec<FinalizeSpan>> {
self.finalized.try_write().expect(LOCK_MSG)
}
@@ -76,31 +110,53 @@ impl SpanStack {
self.active.try_write().expect(LOCK_MSG)
}
- fn pop_active(&self, index: usize) -> Option<ActiveSpan> {
+ fn pop_active(&self, uid: SpanUid) -> Option<ActiveSpan> {
let mut stack = self.active_mut();
- if stack.len() > index + 1 {
- None
- } else {
+ if stack
+ .last()
+ .map(|span| span.uid() == uid)
+ .unwrap_or_default()
+ {
stack.pop()
+ } else {
+ None
}
}
/// Close span. We can't use closed span after finalize called.
- pub(crate) fn finalize_span(&self, index: usize, mut obj: SpanObject) {
- let span = self.pop_active(index);
- if let Some(span) = span {
- if span.span_id == obj.span_id {
- obj.end_time = fetch_time(TimePeriod::End);
+ pub(crate) fn finalize_span(&self, uid: SpanUid, obj: Option<SpanObject>) {
+ let Some(active_span) = self.pop_active(uid) else {
+ panic!("Finalize span isn't the active span");
+ };
- if let Some(r#ref) = span.r#ref {
+ let finalize_span = match obj {
+ Some(mut obj) => {
+ obj.end_time = fetch_time(TimePeriod::End);
+ if let Some(r#ref) = active_span.r#ref {
obj.refs.push(r#ref);
}
+ FinalizeSpan::new(uid, Some(obj), None)
+ }
+ None => FinalizeSpan::new(uid, None, active_span.r#ref),
+ };
+
+ self.finalized_mut().push(finalize_span);
+ }
- self.finalized_mut().push(obj);
+ /// Close async span, fill the span object.
+ pub(crate) fn finalize_async_span(&self, uid: SpanUid, mut obj:
SpanObject) {
+ for finalize_span in &mut *self.finalized_mut() {
+ if finalize_span.uid == uid {
+ obj.end_time = fetch_time(TimePeriod::End);
+ if let Some(r#ref) = take(&mut finalize_span.r#ref) {
+ obj.refs.push(r#ref);
+ }
+ finalize_span.obj = Some(obj);
return;
}
}
- panic!("Finalize span isn't the active span");
+
+ unreachable!()
}
}
@@ -116,28 +172,19 @@ pub struct TracingContext {
next_span_id: i32,
span_stack: Arc<SpanStack>,
primary_endpoint_name: String,
+ span_uid_generator: AtomicUsize,
+ wg: WaitGroup,
tracer: WeakTracer,
}
impl std::fmt::Debug for TracingContext {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let span_objects: Vec<SpanObject>;
f.debug_struct("TracingContext")
.field("trace_id", &self.trace_id)
.field("trace_segment_id", &self.trace_segment_id)
.field("service", &self.service)
.field("service_instance", &self.service_instance)
.field("next_span_id", &self.next_span_id)
- .field(
- "finalized_spans",
- match self.span_stack.finalized.try_read() {
- Some(spans) => {
- span_objects = spans.clone();
- &span_objects
- }
- None => &"<locked>",
- },
- )
.finish()
}
}
@@ -157,6 +204,8 @@ impl TracingContext {
next_span_id: Default::default(),
span_stack: Default::default(),
primary_endpoint_name: Default::default(),
+ span_uid_generator: AtomicUsize::new(0),
+ wg: Default::default(),
tracer,
}
}
@@ -196,16 +245,19 @@ impl TracingContext {
span_id
}
+ /// The span uid is to identify the [Span] for crate.
+ fn generate_span_uid(&self) -> SpanUid {
+ self.span_uid_generator.fetch_add(1, Ordering::SeqCst)
+ }
+
/// Clone the last finalized span.
#[doc(hidden)]
pub fn last_span(&self) -> Option<SpanObject> {
- RwLockReadGuard::try_map(self.span_stack.finalized(), |spans|
spans.last())
- .ok()
- .as_deref()
- .cloned()
+ let spans = &*self.span_stack.finalized();
+ spans.iter().rev().find_map(|span| span.obj.clone())
}
- fn spans_mut(&mut self) -> RwLockWriteGuard<'_, Vec<SpanObject>> {
+ fn finalize_spans_mut(&mut self) -> RwLockWriteGuard<'_,
Vec<FinalizeSpan>> {
self.span_stack.finalized.try_write().expect(LOCK_MSG)
}
@@ -243,7 +295,7 @@ impl TracingContext {
);
let index = self.push_active_span(&span);
- Span::new(index, span, self.span_stack.clone())
+ Span::new(index, span, self.wg.clone(), self.span_stack.clone())
}
/// Create a new entry span, which is an initiator of collection of spans.
@@ -275,28 +327,20 @@ impl TracingContext {
/// Create a new exit span, which will be created when tracing context will
/// generate new span for function invocation.
+ ///
/// Currently, this SDK supports RPC call. So we must set `remote_peer`.
///
/// # Panics
///
/// Panic if entry span not existed.
+ #[inline]
pub fn create_exit_span(&mut self, operation_name: &str, remote_peer:
&str) -> Span {
- if self.next_span_id() == 0 {
- panic!("entry span must be existed.");
- }
-
- let span = Span::new_obj(
- self.inc_next_span_id(),
- self.peek_active_span_id().unwrap_or(-1),
- operation_name.to_string(),
- remote_peer.to_string(),
+ self.create_common_span(
+ operation_name,
+ remote_peer,
SpanType::Exit,
- SpanLayer::Http,
- false,
- );
-
- let index = self.push_active_span(&span);
- Span::new(index, span, self.span_stack.clone())
+ self.peek_active_span_id().unwrap_or(-1),
+ )
}
/// Create a new local span.
@@ -304,23 +348,40 @@ impl TracingContext {
/// # Panics
///
/// Panic if entry span not existed.
+ #[inline]
pub fn create_local_span(&mut self, operation_name: &str) -> Span {
+ self.create_common_span(
+ operation_name,
+ "",
+ SpanType::Local,
+ self.peek_active_span_id().unwrap_or(-1),
+ )
+ }
+
+ /// create exit or local span common logic.
+ fn create_common_span(
+ &mut self,
+ operation_name: &str,
+ remote_peer: &str,
+ span_type: SpanType,
+ parent_span_id: i32,
+ ) -> Span {
if self.next_span_id() == 0 {
panic!("entry span must be existed.");
}
let span = Span::new_obj(
self.inc_next_span_id(),
- self.peek_active_span_id().unwrap_or(-1),
+ parent_span_id,
operation_name.to_string(),
- Default::default(),
- SpanType::Local,
+ remote_peer.to_string(),
+ span_type,
SpanLayer::Unknown,
false,
);
- let index = self.push_active_span(&span);
- Span::new(index, span, self.span_stack.clone())
+ let uid = self.push_active_span(&span);
+ Span::new(uid, span, self.wg.clone(), self.span_stack.clone())
}
/// Capture a snapshot for cross-thread propagation.
@@ -357,6 +418,11 @@ impl TracingContext {
}
}
+ /// Wait all async span dropped which, created by
[Span::prepare_for_async].
+ pub fn wait(self) {
+ self.wg.clone().wait();
+ }
+
/// It converts tracing context into segment object.
/// This conversion should be done before sending segments into OAP.
///
@@ -367,7 +433,12 @@ impl TracingContext {
let trace_segment_id = self.trace_segment_id().to_owned();
let service = self.service().to_owned();
let service_instance = self.service_instance().to_owned();
- let spans = take(&mut *self.spans_mut());
+ let spans = take(&mut *self.finalize_spans_mut());
+
+ let spans = spans
+ .into_iter()
+ .map(|span| span.obj.expect("Some async span haven't finished"))
+ .collect();
SegmentObject {
trace_id,
@@ -383,11 +454,14 @@ impl TracingContext {
self.active_span().map(|span| span.span_id)
}
- fn push_active_span(&mut self, span: &SpanObject) -> usize {
+ fn push_active_span(&mut self, span: &SpanObject) -> SpanUid {
+ let uid = self.generate_span_uid();
+
self.primary_endpoint_name = span.operation_name.clone();
let mut stack = self.active_span_stack_mut();
- stack.push(ActiveSpan::new(span.span_id));
- stack.len() - 1
+ stack.push(ActiveSpan::new(uid, span.span_id));
+
+ uid
}
fn upgrade_tracer(&self) -> Tracer {
diff --git a/tests/logging.rs b/tests/logging.rs
index e3f9d36..f2de2fe 100644
--- a/tests/logging.rs
+++ b/tests/logging.rs
@@ -24,7 +24,7 @@ use skywalking::{
TextLog, TraceContext,
},
reporter::{CollectItem, Report},
- trace::tracer::Tracer,
+ trace::{span::AbstractSpan, tracer::Tracer},
};
use std::{
collections::LinkedList,
diff --git a/tests/trace_context.rs b/tests/trace_context.rs
index 6501922..458c06e 100644
--- a/tests/trace_context.rs
+++ b/tests/trace_context.rs
@@ -23,6 +23,7 @@ use skywalking::{
reporter::{print::PrintReporter, CollectItem, Report},
trace::{
propagation::{decoder::decode_propagation,
encoder::encode_propagation},
+ span::AbstractSpan,
tracer::Tracer,
},
};
@@ -107,7 +108,7 @@ async fn create_span() {
operation_name: "op3".to_string(),
peer: "example.com/test".to_string(),
span_type: SpanType::Exit as i32,
- span_layer: SpanLayer::Http as i32,
+ span_layer: SpanLayer::Unknown as i32,
component_id: 11000,
is_error: false,
tags: Vec::<KeyStringValuePair>::new(),
@@ -133,7 +134,7 @@ async fn create_span() {
operation_name: "op5".to_string(),
peer: "example.com/test".to_string(),
span_type: SpanType::Exit as i32,
- span_layer: SpanLayer::Http as i32,
+ span_layer: SpanLayer::Unknown as i32,
component_id: 11000,
is_error: false,
tags: Vec::<KeyStringValuePair>::new(),
@@ -144,6 +145,54 @@ async fn create_span() {
}
}
+ {
+ let span6 = context.create_local_span("op6");
+
+ {
+ let span7 = context.create_local_span("op7");
+ let span7 = span7.prepare_for_async();
+
assert_eq!(context.last_span().unwrap().operation_name, "op4");
+ assert_eq!(
+ span7.span_object().parent_span_id,
+ span6.span_object().span_id,
+ );
+
+ let span8 = context.create_exit_span("op7",
"example.com/test");
+ let mut span8 = span8.prepare_for_async();
+
assert_eq!(context.last_span().unwrap().operation_name, "op4");
+ assert_eq!(
+ span8.span_object().parent_span_id,
+ span6.span_object().span_id,
+ );
+
+ {
+ let _ = span7;
+ span8.add_tag("foo", "bar");
+ }
+ }
+
+ let span8_expected = SpanObject {
+ span_id: 7,
+ parent_span_id: 5,
+ start_time: 1,
+ end_time: 100,
+ refs: Vec::<SegmentReference>::new(),
+ operation_name: "op7".to_string(),
+ peer: "example.com/test".to_string(),
+ span_type: SpanType::Exit as i32,
+ span_layer: SpanLayer::Unknown as i32,
+ component_id: 11000,
+ is_error: false,
+ tags: vec![KeyStringValuePair {
+ key: "foo".to_string(),
+ value: "bar".to_string(),
+ }],
+ logs: Vec::<Log>::new(),
+ skip_analysis: false,
+ };
+ assert_eq!(context.last_span(), Some(span8_expected));
+ }
+
drop(span1);
let span1_expected = SpanObject {
@@ -165,6 +214,8 @@ async fn create_span() {
assert_eq!(context.last_span(), Some(span1_expected));
}
+ context.wait();
+
tracer
},
|segment| {