This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git


The following commit(s) were added to refs/heads/master by this push:
     new f544829  Refactor span object api to make it more friendly. (#52)
f544829 is described below

commit f544829e6010a6a0f6b115b296878168a95a53da
Author: jmjoy <[email protected]>
AuthorDate: Wed Jan 11 16:47:04 2023 +0800

    Refactor span object api to make it more friendly. (#52)
---
 Cargo.toml                 |   1 +
 src/logging/record.rs      |   2 +-
 src/trace/span.rs          |  89 +++++++++++++++++++++-------------
 src/trace/trace_context.rs | 118 +++++++++++++++++++++------------------------
 tests/trace_context.rs     |   6 +--
 5 files changed, 115 insertions(+), 101 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 4604fce..5b66f22 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -48,6 +48,7 @@ futures-core = "0.3.21"
 futures-util = "0.3.21"
 hostname = { version = "0.3.1", optional = true }
 once_cell = "1.14.0"
+parking_lot = "0.12.1"
 portable-atomic = { version = "0.3.13", features = ["float"] }
 prost = "0.11.0"
 prost-derive = "0.11.0"
diff --git a/src/logging/record.rs b/src/logging/record.rs
index 3e1ce91..3a34872 100644
--- a/src/logging/record.rs
+++ b/src/logging/record.rs
@@ -114,7 +114,7 @@ impl LogRecord {
 
     /// The span should be unique in the whole segment.
     pub fn with_span(mut self, span: &Span) -> Self {
-        self.span_id = Some(span.with_span_object(|span| span.span_id));
+        self.span_id = Some(span.span_id());
         self
     }
 
diff --git a/src/trace/span.rs b/src/trace/span.rs
index 16ecf79..60371f0 100644
--- a/src/trace/span.rs
+++ b/src/trace/span.rs
@@ -19,15 +19,46 @@
 
 use crate::{
     common::system_time::{fetch_time, TimePeriod},
-    error::LOCK_MSG,
     skywalking_proto::v3::{SpanLayer, SpanObject, SpanType},
     trace::trace_context::SpanStack,
 };
+use parking_lot::{
+    MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLockReadGuard, 
RwLockWriteGuard,
+};
 use std::{
     fmt::Formatter,
-    sync::{Arc, Weak},
+    ops::{Deref, DerefMut},
+    sync::Arc,
 };
 
+/// Wrapper of [SpanObject] immutable reference.
+pub struct SpanObjectRef<'a>(pub(crate) MappedRwLockReadGuard<'a, SpanObject>);
+
+impl Deref for SpanObjectRef<'_> {
+    type Target = SpanObject;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+/// Wrapper of [SpanObject] mutable reference.
+pub struct SpanObjectMut<'a>(MappedRwLockWriteGuard<'a, SpanObject>);
+
+impl Deref for SpanObjectMut<'_> {
+    type Target = SpanObject;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl DerefMut for SpanObjectMut<'_> {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.0
+    }
+}
+
 /// Span is a concept that represents trace information for a single RPC.
 /// The Rust SDK supports Entry Span to represent inbound to a service
 /// and Exit Span to represent outbound from a service.
@@ -66,7 +97,7 @@ use std::{
 #[must_use = "assign a variable name to guard the span not be dropped 
immediately."]
 pub struct Span {
     index: usize,
-    stack: Weak<SpanStack>,
+    stack: Arc<SpanStack>,
 }
 
 impl std::fmt::Debug for Span {
@@ -75,18 +106,15 @@ impl std::fmt::Debug for Span {
         f.debug_struct("Span")
             .field(
                 "data",
-                match self.stack.upgrade() {
-                    Some(stack) => match stack.active.try_read() {
-                        Ok(spans) => match spans.get(self.index) {
-                            Some(span) => {
-                                span_object = span.clone();
-                                &span_object
-                            }
-                            None => &"<hanged>",
-                        },
-                        Err(_) => &"<locked>",
+                match self.stack.active.try_read() {
+                    Some(spans) => match spans.get(self.index) {
+                        Some(span) => {
+                            span_object = span.clone();
+                            &span_object
+                        }
+                        None => &"<hanged>",
                     },
-                    None => &"<dropped>",
+                    None => &"<locked>",
                 },
             )
             .finish()
@@ -96,7 +124,7 @@ impl std::fmt::Debug for Span {
 const SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;
 
 impl Span {
-    pub(crate) fn new(index: usize, stack: Weak<SpanStack>) -> Self {
+    pub(crate) fn new(index: usize, stack: Arc<SpanStack>) -> Self {
         Self { index, stack }
     }
 
@@ -124,24 +152,23 @@ impl Span {
         }
     }
 
-    fn upgrade_stack(&self) -> Arc<SpanStack> {
-        self.stack.upgrade().expect("Context has dropped")
-    }
-
-    /// Immutable with inner span object.
-    pub fn with_span_object<T>(&self, f: impl FnOnce(&SpanObject) -> T) -> T {
-        self.upgrade_stack()
-            .with_active(|stack| f(&stack[self.index]))
+    /// Get immutable span object reference.
+    pub fn span_object(&self) -> SpanObjectRef<'_> {
+        SpanObjectRef(RwLockReadGuard::map(self.stack.active(), |stack| {
+            &stack[self.index]
+        }))
     }
 
     /// Mutable with inner span object.
-    pub fn with_span_object_mut<T>(&mut self, f: impl FnOnce(&mut SpanObject) 
-> T) -> T {
-        f(&mut 
(self.upgrade_stack().active.try_write().expect(LOCK_MSG))[self.index])
+    pub fn span_object_mut(&mut self) -> SpanObjectMut<'_> {
+        SpanObjectMut(RwLockWriteGuard::map(self.stack.active_mut(), |stack| {
+            &mut stack[self.index]
+        }))
     }
 
     /// Get span id.
     pub fn span_id(&self) -> i32 {
-        self.with_span_object(|span| span.span_id)
+        self.span_object().span_id
     }
 
     /// Add logs to the span.
@@ -151,24 +178,20 @@ impl Span {
         V: Into<String>,
         I: IntoIterator<Item = (K, V)>,
     {
-        self.with_span_object_mut(|span| span.add_log(message))
+        self.span_object_mut().add_log(message)
     }
 
     /// Add tag to the span.
     pub fn add_tag(&mut self, key: impl Into<String>, value: impl 
Into<String>) {
-        self.with_span_object_mut(|span| span.add_tag(key, value))
+        self.span_object_mut().add_tag(key, value)
     }
 }
 
 impl Drop for Span {
     /// Set the end time as current time, pop from context active span stack,
     /// and push to context spans.
-    ///
-    /// # Panics
-    ///
-    /// Panic if context is dropped or this span isn't the active span.
     fn drop(&mut self) {
-        self.upgrade_stack().finalize_span(self.index);
+        self.stack.finalize_span(self.index);
     }
 }
 
diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs
index e9612c2..95641ea 100644
--- a/src/trace/trace_context.rs
+++ b/src/trace/trace_context.rs
@@ -29,15 +29,14 @@ use crate::{
     },
     trace::{
         propagation::context::PropagationContext,
-        span::Span,
+        span::{Span, SpanObjectRef},
         tracer::{Tracer, WeakTracer},
     },
 };
-use std::{
-    fmt::Formatter,
-    mem::take,
-    sync::{Arc, RwLock},
+use parking_lot::{
+    MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard, 
RwLockWriteGuard,
 };
+use std::{fmt::Formatter, mem::take, sync::Arc};
 
 #[derive(Default)]
 pub(crate) struct SpanStack {
@@ -47,30 +46,29 @@ pub(crate) struct SpanStack {
 }
 
 impl SpanStack {
-    pub(crate) fn with_finalized<T>(&self, f: impl FnOnce(&Vec<SpanObject>) -> 
T) -> T {
-        f(&self.finalized.try_read().expect(LOCK_MSG))
+    pub(crate) fn finalized(&self) -> RwLockReadGuard<'_, Vec<SpanObject>> {
+        self.finalized.try_read().expect(LOCK_MSG)
     }
 
-    pub(crate) fn with_finalized_mut<T>(&self, f: impl FnOnce(&mut 
Vec<SpanObject>) -> T) -> T {
-        f(&mut self.finalized.try_write().expect(LOCK_MSG))
+    pub(crate) fn finalized_mut(&self) -> RwLockWriteGuard<'_, 
Vec<SpanObject>> {
+        self.finalized.try_write().expect(LOCK_MSG)
     }
 
-    pub(crate) fn with_active<T>(&self, f: impl FnOnce(&Vec<SpanObject>) -> T) 
-> T {
-        f(&self.active.try_read().expect(LOCK_MSG))
+    pub(crate) fn active(&self) -> RwLockReadGuard<'_, Vec<SpanObject>> {
+        self.active.try_read().expect(LOCK_MSG)
     }
 
-    pub(crate) fn with_active_mut<T>(&self, f: impl FnOnce(&mut 
Vec<SpanObject>) -> T) -> T {
-        f(&mut self.active.try_write().expect(LOCK_MSG))
+    pub(crate) fn active_mut(&self) -> RwLockWriteGuard<'_, Vec<SpanObject>> {
+        self.active.try_write().expect(LOCK_MSG)
     }
 
     fn pop_active(&self, index: usize) -> Option<SpanObject> {
-        self.with_active_mut(|stack| {
-            if stack.len() > index + 1 {
-                None
-            } else {
-                stack.pop()
-            }
-        })
+        let mut stack = self.active_mut();
+        if stack.len() > index + 1 {
+            None
+        } else {
+            stack.pop()
+        }
     }
 
     /// Close span. We can't use closed span after finalize called.
@@ -78,7 +76,7 @@ impl SpanStack {
         let span = self.pop_active(index);
         if let Some(mut span) = span {
             span.end_time = fetch_time(TimePeriod::End);
-            self.with_finalized_mut(|spans| spans.push(span));
+            self.finalized_mut().push(span);
         } else {
             panic!("Finalize span isn't the active span");
         }
@@ -112,11 +110,11 @@ impl std::fmt::Debug for TracingContext {
             .field(
                 "finalized_spans",
                 match self.span_stack.finalized.try_read() {
-                    Ok(spans) => {
+                    Some(spans) => {
                         span_objects = spans.clone();
                         &span_objects
                     }
-                    Err(_) => &"<locked>",
+                    None => &"<locked>",
                 },
             )
             .finish()
@@ -178,35 +176,30 @@ impl TracingContext {
     }
 
     /// Get the last finalized span.
-    pub fn last_span(&self) -> Option<SpanObject> {
-        self.span_stack
-            .with_finalized(|spans| spans.last().cloned())
+    pub fn last_span(&self) -> Option<SpanObjectRef<'_>> {
+        RwLockReadGuard::try_map(self.span_stack.finalized(), |spans| 
spans.last())
+            .ok()
+            .map(SpanObjectRef)
     }
 
-    fn with_spans_mut<T>(&mut self, f: impl FnOnce(&mut Vec<SpanObject>) -> T) 
-> T {
-        f(&mut self.span_stack.finalized.try_write().expect(LOCK_MSG))
+    fn spans_mut(&mut self) -> RwLockWriteGuard<'_, Vec<SpanObject>> {
+        self.span_stack.finalized.try_write().expect(LOCK_MSG)
     }
 
-    pub(crate) fn with_active_span_stack<T>(&self, f: impl 
FnOnce(&Vec<SpanObject>) -> T) -> T {
-        self.span_stack.with_active(f)
+    pub(crate) fn active_span_stack(&self) -> RwLockReadGuard<'_, 
Vec<SpanObject>> {
+        self.span_stack.active()
     }
 
-    pub(crate) fn with_active_span_stack_mut<T>(
-        &mut self,
-        f: impl FnOnce(&mut Vec<SpanObject>) -> T,
-    ) -> T {
-        self.span_stack.with_active_mut(f)
+    pub(crate) fn active_span_stack_mut(&mut self) -> RwLockWriteGuard<'_, 
Vec<SpanObject>> {
+        self.span_stack.active_mut()
     }
 
-    pub(crate) fn with_active_span<T>(&self, f: impl FnOnce(&SpanObject) -> T) 
-> Option<T> {
-        self.with_active_span_stack(|stack| stack.last().map(f))
+    pub(crate) fn active_span(&self) -> Option<MappedRwLockReadGuard<'_, 
SpanObject>> {
+        RwLockReadGuard::try_map(self.active_span_stack(), |stack| 
stack.last()).ok()
     }
 
-    pub(crate) fn with_active_span_mut<T>(
-        &mut self,
-        f: impl FnOnce(&mut SpanObject) -> T,
-    ) -> Option<T> {
-        self.with_active_span_stack_mut(|stack| stack.last_mut().map(f))
+    pub(crate) fn active_span_mut(&mut self) -> 
Option<MappedRwLockWriteGuard<'_, SpanObject>> {
+        RwLockWriteGuard::try_map(self.active_span_stack_mut(), |stack| 
stack.last_mut()).ok()
     }
 
     /// Create a new entry span, which is an initiator of collection of spans.
@@ -227,7 +220,7 @@ impl TracingContext {
         );
 
         let index = self.push_active_span(span);
-        Span::new(index, Arc::downgrade(&self.span_stack))
+        Span::new(index, self.span_stack.clone())
     }
 
     /// Create a new entry span, which is an initiator of collection of spans.
@@ -244,17 +237,15 @@ impl TracingContext {
     ) -> Span {
         let mut span = self.create_entry_span(operation_name);
         self.trace_id = propagation.parent_trace_id.clone();
-        span.with_span_object_mut(|span| {
-            span.refs.push(SegmentReference {
-                ref_type: RefType::CrossProcess as i32,
-                trace_id: self.trace_id().to_owned(),
-                parent_trace_segment_id: 
propagation.parent_trace_segment_id.clone(),
-                parent_span_id: propagation.parent_span_id,
-                parent_service: propagation.parent_service.clone(),
-                parent_service_instance: 
propagation.parent_service_instance.clone(),
-                parent_endpoint: propagation.destination_endpoint.clone(),
-                network_address_used_at_peer: 
propagation.destination_address.clone(),
-            });
+        span.span_object_mut().refs.push(SegmentReference {
+            ref_type: RefType::CrossProcess as i32,
+            trace_id: self.trace_id().to_owned(),
+            parent_trace_segment_id: 
propagation.parent_trace_segment_id.clone(),
+            parent_span_id: propagation.parent_span_id,
+            parent_service: propagation.parent_service.clone(),
+            parent_service_instance: 
propagation.parent_service_instance.clone(),
+            parent_endpoint: propagation.destination_endpoint.clone(),
+            network_address_used_at_peer: 
propagation.destination_address.clone(),
         });
         span
     }
@@ -282,7 +273,7 @@ impl TracingContext {
         );
 
         let index = self.push_active_span(span);
-        Span::new(index, Arc::downgrade(&self.span_stack))
+        Span::new(index, self.span_stack.clone())
     }
 
     /// Create a new local span.
@@ -306,7 +297,7 @@ impl TracingContext {
         );
 
         let index = self.push_active_span(span);
-        Span::new(index, Arc::downgrade(&self.span_stack))
+        Span::new(index, self.span_stack.clone())
     }
 
     /// Capture a snapshot for cross-thread propagation.
@@ -337,9 +328,9 @@ impl TracingContext {
                 network_address_used_at_peer: Default::default(),
             };
 
-            self.with_active_span_mut(|span| {
+            if let Some(mut span) = self.active_span_mut() {
                 span.refs.push(segment_ref);
-            });
+            }
         }
     }
 
@@ -353,7 +344,7 @@ impl TracingContext {
         let trace_segment_id = self.trace_segment_id().to_owned();
         let service = self.service().to_owned();
         let service_instance = self.service_instance().to_owned();
-        let spans = self.with_spans_mut(|spans| take(spans));
+        let spans = take(&mut *self.spans_mut());
 
         SegmentObject {
             trace_id,
@@ -366,15 +357,14 @@ impl TracingContext {
     }
 
     pub(crate) fn peek_active_span_id(&self) -> Option<i32> {
-        self.with_active_span(|span| span.span_id)
+        self.active_span().map(|span| span.span_id)
     }
 
     fn push_active_span(&mut self, span: SpanObject) -> usize {
         self.primary_endpoint_name = span.operation_name.clone();
-        self.with_active_span_stack_mut(|stack| {
-            stack.push(span);
-            stack.len() - 1
-        })
+        let mut stack = self.active_span_stack_mut();
+        stack.push(span);
+        stack.len() - 1
     }
 
     fn upgrade_tracer(&self) -> Tracer {
diff --git a/tests/trace_context.rs b/tests/trace_context.rs
index 4603ad9..c67bdf6 100644
--- a/tests/trace_context.rs
+++ b/tests/trace_context.rs
@@ -114,7 +114,7 @@ async fn create_span() {
                         logs: Vec::<Log>::new(),
                         skip_analysis: false,
                     };
-                    assert_eq!(context.last_span(), Some(span3_expected));
+                    assert_eq!(context.last_span().as_deref(), 
Some(&span3_expected));
                 }
 
                 {
@@ -140,7 +140,7 @@ async fn create_span() {
                             logs: Vec::<Log>::new(),
                             skip_analysis: false,
                         };
-                        assert_eq!(context.last_span(), Some(span5_expected));
+                        assert_eq!(context.last_span().as_deref(), 
Some(&span5_expected));
                     }
                 }
 
@@ -162,7 +162,7 @@ async fn create_span() {
                     logs: expected_log,
                     skip_analysis: false,
                 };
-                assert_eq!(context.last_span(), Some(span1_expected));
+                assert_eq!(context.last_span().as_deref(), 
Some(&span1_expected));
             }
 
             tracer

Reply via email to