This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git
The following commit(s) were added to refs/heads/master by this push:
new f544829 Refactor span object api to make it more friendly. (#52)
f544829 is described below
commit f544829e6010a6a0f6b115b296878168a95a53da
Author: jmjoy <[email protected]>
AuthorDate: Wed Jan 11 16:47:04 2023 +0800
Refactor span object api to make it more friendly. (#52)
---
Cargo.toml | 1 +
src/logging/record.rs | 2 +-
src/trace/span.rs | 89 +++++++++++++++++++++-------------
src/trace/trace_context.rs | 118 +++++++++++++++++++++------------------------
tests/trace_context.rs | 6 +--
5 files changed, 115 insertions(+), 101 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 4604fce..5b66f22 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -48,6 +48,7 @@ futures-core = "0.3.21"
futures-util = "0.3.21"
hostname = { version = "0.3.1", optional = true }
once_cell = "1.14.0"
+parking_lot = "0.12.1"
portable-atomic = { version = "0.3.13", features = ["float"] }
prost = "0.11.0"
prost-derive = "0.11.0"
diff --git a/src/logging/record.rs b/src/logging/record.rs
index 3e1ce91..3a34872 100644
--- a/src/logging/record.rs
+++ b/src/logging/record.rs
@@ -114,7 +114,7 @@ impl LogRecord {
/// The span should be unique in the whole segment.
pub fn with_span(mut self, span: &Span) -> Self {
- self.span_id = Some(span.with_span_object(|span| span.span_id));
+ self.span_id = Some(span.span_id());
self
}
diff --git a/src/trace/span.rs b/src/trace/span.rs
index 16ecf79..60371f0 100644
--- a/src/trace/span.rs
+++ b/src/trace/span.rs
@@ -19,15 +19,46 @@
use crate::{
common::system_time::{fetch_time, TimePeriod},
- error::LOCK_MSG,
skywalking_proto::v3::{SpanLayer, SpanObject, SpanType},
trace::trace_context::SpanStack,
};
+use parking_lot::{
+ MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLockReadGuard,
RwLockWriteGuard,
+};
use std::{
fmt::Formatter,
- sync::{Arc, Weak},
+ ops::{Deref, DerefMut},
+ sync::Arc,
};
+/// Wrapper of [SpanObject] immutable reference.
+pub struct SpanObjectRef<'a>(pub(crate) MappedRwLockReadGuard<'a, SpanObject>);
+
+impl Deref for SpanObjectRef<'_> {
+ type Target = SpanObject;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+/// Wrapper of [SpanObject] mutable reference.
+pub struct SpanObjectMut<'a>(MappedRwLockWriteGuard<'a, SpanObject>);
+
+impl Deref for SpanObjectMut<'_> {
+ type Target = SpanObject;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for SpanObjectMut<'_> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
/// Span is a concept that represents trace information for a single RPC.
/// The Rust SDK supports Entry Span to represent inbound to a service
/// and Exit Span to represent outbound from a service.
@@ -66,7 +97,7 @@ use std::{
#[must_use = "assign a variable name to guard the span not be dropped
immediately."]
pub struct Span {
index: usize,
- stack: Weak<SpanStack>,
+ stack: Arc<SpanStack>,
}
impl std::fmt::Debug for Span {
@@ -75,18 +106,15 @@ impl std::fmt::Debug for Span {
f.debug_struct("Span")
.field(
"data",
- match self.stack.upgrade() {
- Some(stack) => match stack.active.try_read() {
- Ok(spans) => match spans.get(self.index) {
- Some(span) => {
- span_object = span.clone();
- &span_object
- }
- None => &"<hanged>",
- },
- Err(_) => &"<locked>",
+ match self.stack.active.try_read() {
+ Some(spans) => match spans.get(self.index) {
+ Some(span) => {
+ span_object = span.clone();
+ &span_object
+ }
+ None => &"<hanged>",
},
- None => &"<dropped>",
+ None => &"<locked>",
},
)
.finish()
@@ -96,7 +124,7 @@ impl std::fmt::Debug for Span {
const SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;
impl Span {
- pub(crate) fn new(index: usize, stack: Weak<SpanStack>) -> Self {
+ pub(crate) fn new(index: usize, stack: Arc<SpanStack>) -> Self {
Self { index, stack }
}
@@ -124,24 +152,23 @@ impl Span {
}
}
- fn upgrade_stack(&self) -> Arc<SpanStack> {
- self.stack.upgrade().expect("Context has dropped")
- }
-
- /// Immutable with inner span object.
- pub fn with_span_object<T>(&self, f: impl FnOnce(&SpanObject) -> T) -> T {
- self.upgrade_stack()
- .with_active(|stack| f(&stack[self.index]))
+ /// Get immutable span object reference.
+ pub fn span_object(&self) -> SpanObjectRef<'_> {
+ SpanObjectRef(RwLockReadGuard::map(self.stack.active(), |stack| {
+ &stack[self.index]
+ }))
}
/// Mutable with inner span object.
- pub fn with_span_object_mut<T>(&mut self, f: impl FnOnce(&mut SpanObject)
-> T) -> T {
- f(&mut
(self.upgrade_stack().active.try_write().expect(LOCK_MSG))[self.index])
+ pub fn span_object_mut(&mut self) -> SpanObjectMut<'_> {
+ SpanObjectMut(RwLockWriteGuard::map(self.stack.active_mut(), |stack| {
+ &mut stack[self.index]
+ }))
}
/// Get span id.
pub fn span_id(&self) -> i32 {
- self.with_span_object(|span| span.span_id)
+ self.span_object().span_id
}
/// Add logs to the span.
@@ -151,24 +178,20 @@ impl Span {
V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
- self.with_span_object_mut(|span| span.add_log(message))
+ self.span_object_mut().add_log(message)
}
/// Add tag to the span.
pub fn add_tag(&mut self, key: impl Into<String>, value: impl
Into<String>) {
- self.with_span_object_mut(|span| span.add_tag(key, value))
+ self.span_object_mut().add_tag(key, value)
}
}
impl Drop for Span {
/// Set the end time as current time, pop from context active span stack,
/// and push to context spans.
- ///
- /// # Panics
- ///
- /// Panic if context is dropped or this span isn't the active span.
fn drop(&mut self) {
- self.upgrade_stack().finalize_span(self.index);
+ self.stack.finalize_span(self.index);
}
}
diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs
index e9612c2..95641ea 100644
--- a/src/trace/trace_context.rs
+++ b/src/trace/trace_context.rs
@@ -29,15 +29,14 @@ use crate::{
},
trace::{
propagation::context::PropagationContext,
- span::Span,
+ span::{Span, SpanObjectRef},
tracer::{Tracer, WeakTracer},
},
};
-use std::{
- fmt::Formatter,
- mem::take,
- sync::{Arc, RwLock},
+use parking_lot::{
+ MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard,
RwLockWriteGuard,
};
+use std::{fmt::Formatter, mem::take, sync::Arc};
#[derive(Default)]
pub(crate) struct SpanStack {
@@ -47,30 +46,29 @@ pub(crate) struct SpanStack {
}
impl SpanStack {
- pub(crate) fn with_finalized<T>(&self, f: impl FnOnce(&Vec<SpanObject>) ->
T) -> T {
- f(&self.finalized.try_read().expect(LOCK_MSG))
+ pub(crate) fn finalized(&self) -> RwLockReadGuard<'_, Vec<SpanObject>> {
+ self.finalized.try_read().expect(LOCK_MSG)
}
- pub(crate) fn with_finalized_mut<T>(&self, f: impl FnOnce(&mut
Vec<SpanObject>) -> T) -> T {
- f(&mut self.finalized.try_write().expect(LOCK_MSG))
+ pub(crate) fn finalized_mut(&self) -> RwLockWriteGuard<'_,
Vec<SpanObject>> {
+ self.finalized.try_write().expect(LOCK_MSG)
}
- pub(crate) fn with_active<T>(&self, f: impl FnOnce(&Vec<SpanObject>) -> T)
-> T {
- f(&self.active.try_read().expect(LOCK_MSG))
+ pub(crate) fn active(&self) -> RwLockReadGuard<'_, Vec<SpanObject>> {
+ self.active.try_read().expect(LOCK_MSG)
}
- pub(crate) fn with_active_mut<T>(&self, f: impl FnOnce(&mut
Vec<SpanObject>) -> T) -> T {
- f(&mut self.active.try_write().expect(LOCK_MSG))
+ pub(crate) fn active_mut(&self) -> RwLockWriteGuard<'_, Vec<SpanObject>> {
+ self.active.try_write().expect(LOCK_MSG)
}
fn pop_active(&self, index: usize) -> Option<SpanObject> {
- self.with_active_mut(|stack| {
- if stack.len() > index + 1 {
- None
- } else {
- stack.pop()
- }
- })
+ let mut stack = self.active_mut();
+ if stack.len() > index + 1 {
+ None
+ } else {
+ stack.pop()
+ }
}
/// Close span. We can't use closed span after finalize called.
@@ -78,7 +76,7 @@ impl SpanStack {
let span = self.pop_active(index);
if let Some(mut span) = span {
span.end_time = fetch_time(TimePeriod::End);
- self.with_finalized_mut(|spans| spans.push(span));
+ self.finalized_mut().push(span);
} else {
panic!("Finalize span isn't the active span");
}
@@ -112,11 +110,11 @@ impl std::fmt::Debug for TracingContext {
.field(
"finalized_spans",
match self.span_stack.finalized.try_read() {
- Ok(spans) => {
+ Some(spans) => {
span_objects = spans.clone();
&span_objects
}
- Err(_) => &"<locked>",
+ None => &"<locked>",
},
)
.finish()
@@ -178,35 +176,30 @@ impl TracingContext {
}
/// Get the last finalized span.
- pub fn last_span(&self) -> Option<SpanObject> {
- self.span_stack
- .with_finalized(|spans| spans.last().cloned())
+ pub fn last_span(&self) -> Option<SpanObjectRef<'_>> {
+ RwLockReadGuard::try_map(self.span_stack.finalized(), |spans|
spans.last())
+ .ok()
+ .map(SpanObjectRef)
}
- fn with_spans_mut<T>(&mut self, f: impl FnOnce(&mut Vec<SpanObject>) -> T)
-> T {
- f(&mut self.span_stack.finalized.try_write().expect(LOCK_MSG))
+ fn spans_mut(&mut self) -> RwLockWriteGuard<'_, Vec<SpanObject>> {
+ self.span_stack.finalized.try_write().expect(LOCK_MSG)
}
- pub(crate) fn with_active_span_stack<T>(&self, f: impl
FnOnce(&Vec<SpanObject>) -> T) -> T {
- self.span_stack.with_active(f)
+ pub(crate) fn active_span_stack(&self) -> RwLockReadGuard<'_,
Vec<SpanObject>> {
+ self.span_stack.active()
}
- pub(crate) fn with_active_span_stack_mut<T>(
- &mut self,
- f: impl FnOnce(&mut Vec<SpanObject>) -> T,
- ) -> T {
- self.span_stack.with_active_mut(f)
+ pub(crate) fn active_span_stack_mut(&mut self) -> RwLockWriteGuard<'_,
Vec<SpanObject>> {
+ self.span_stack.active_mut()
}
- pub(crate) fn with_active_span<T>(&self, f: impl FnOnce(&SpanObject) -> T)
-> Option<T> {
- self.with_active_span_stack(|stack| stack.last().map(f))
+ pub(crate) fn active_span(&self) -> Option<MappedRwLockReadGuard<'_,
SpanObject>> {
+ RwLockReadGuard::try_map(self.active_span_stack(), |stack|
stack.last()).ok()
}
- pub(crate) fn with_active_span_mut<T>(
- &mut self,
- f: impl FnOnce(&mut SpanObject) -> T,
- ) -> Option<T> {
- self.with_active_span_stack_mut(|stack| stack.last_mut().map(f))
+ pub(crate) fn active_span_mut(&mut self) ->
Option<MappedRwLockWriteGuard<'_, SpanObject>> {
+ RwLockWriteGuard::try_map(self.active_span_stack_mut(), |stack|
stack.last_mut()).ok()
}
/// Create a new entry span, which is an initiator of collection of spans.
@@ -227,7 +220,7 @@ impl TracingContext {
);
let index = self.push_active_span(span);
- Span::new(index, Arc::downgrade(&self.span_stack))
+ Span::new(index, self.span_stack.clone())
}
/// Create a new entry span, which is an initiator of collection of spans.
@@ -244,17 +237,15 @@ impl TracingContext {
) -> Span {
let mut span = self.create_entry_span(operation_name);
self.trace_id = propagation.parent_trace_id.clone();
- span.with_span_object_mut(|span| {
- span.refs.push(SegmentReference {
- ref_type: RefType::CrossProcess as i32,
- trace_id: self.trace_id().to_owned(),
- parent_trace_segment_id:
propagation.parent_trace_segment_id.clone(),
- parent_span_id: propagation.parent_span_id,
- parent_service: propagation.parent_service.clone(),
- parent_service_instance:
propagation.parent_service_instance.clone(),
- parent_endpoint: propagation.destination_endpoint.clone(),
- network_address_used_at_peer:
propagation.destination_address.clone(),
- });
+ span.span_object_mut().refs.push(SegmentReference {
+ ref_type: RefType::CrossProcess as i32,
+ trace_id: self.trace_id().to_owned(),
+ parent_trace_segment_id:
propagation.parent_trace_segment_id.clone(),
+ parent_span_id: propagation.parent_span_id,
+ parent_service: propagation.parent_service.clone(),
+ parent_service_instance:
propagation.parent_service_instance.clone(),
+ parent_endpoint: propagation.destination_endpoint.clone(),
+ network_address_used_at_peer:
propagation.destination_address.clone(),
});
span
}
@@ -282,7 +273,7 @@ impl TracingContext {
);
let index = self.push_active_span(span);
- Span::new(index, Arc::downgrade(&self.span_stack))
+ Span::new(index, self.span_stack.clone())
}
/// Create a new local span.
@@ -306,7 +297,7 @@ impl TracingContext {
);
let index = self.push_active_span(span);
- Span::new(index, Arc::downgrade(&self.span_stack))
+ Span::new(index, self.span_stack.clone())
}
/// Capture a snapshot for cross-thread propagation.
@@ -337,9 +328,9 @@ impl TracingContext {
network_address_used_at_peer: Default::default(),
};
- self.with_active_span_mut(|span| {
+ if let Some(mut span) = self.active_span_mut() {
span.refs.push(segment_ref);
- });
+ }
}
}
@@ -353,7 +344,7 @@ impl TracingContext {
let trace_segment_id = self.trace_segment_id().to_owned();
let service = self.service().to_owned();
let service_instance = self.service_instance().to_owned();
- let spans = self.with_spans_mut(|spans| take(spans));
+ let spans = take(&mut *self.spans_mut());
SegmentObject {
trace_id,
@@ -366,15 +357,14 @@ impl TracingContext {
}
pub(crate) fn peek_active_span_id(&self) -> Option<i32> {
- self.with_active_span(|span| span.span_id)
+ self.active_span().map(|span| span.span_id)
}
fn push_active_span(&mut self, span: SpanObject) -> usize {
self.primary_endpoint_name = span.operation_name.clone();
- self.with_active_span_stack_mut(|stack| {
- stack.push(span);
- stack.len() - 1
- })
+ let mut stack = self.active_span_stack_mut();
+ stack.push(span);
+ stack.len() - 1
}
fn upgrade_tracer(&self) -> Tracer {
diff --git a/tests/trace_context.rs b/tests/trace_context.rs
index 4603ad9..c67bdf6 100644
--- a/tests/trace_context.rs
+++ b/tests/trace_context.rs
@@ -114,7 +114,7 @@ async fn create_span() {
logs: Vec::<Log>::new(),
skip_analysis: false,
};
- assert_eq!(context.last_span(), Some(span3_expected));
+ assert_eq!(context.last_span().as_deref(),
Some(&span3_expected));
}
{
@@ -140,7 +140,7 @@ async fn create_span() {
logs: Vec::<Log>::new(),
skip_analysis: false,
};
- assert_eq!(context.last_span(), Some(span5_expected));
+ assert_eq!(context.last_span().as_deref(),
Some(&span5_expected));
}
}
@@ -162,7 +162,7 @@ async fn create_span() {
logs: expected_log,
skip_analysis: false,
};
- assert_eq!(context.last_span(), Some(span1_expected));
+ assert_eq!(context.last_span().as_deref(),
Some(&span1_expected));
}
tracer