This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch ref in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git
commit aac5949a8bb658aed67f8580448fcb3cc1de9a1b Author: Wu Sheng <[email protected]> AuthorDate: Sun Mar 1 17:08:22 2020 +0800 Support context extract. --- .github/workflows/ci.yaml | 2 +- Cargo.lock | 7 ++ tracing-core/Cargo.toml | 3 +- tracing-core/src/context.rs | 55 +++++++++---- tracing-core/src/context_carrier.rs | 10 ++- tracing-core/src/id.rs | 17 ++-- tracing-core/src/lib.rs | 3 + tracing-core/src/segment_ref.rs | 150 ++++++++++++++++++++++++++++++++++++ tracing-core/src/span.rs | 22 ++++-- 9 files changed, 236 insertions(+), 33 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a198b6c..2cfeb06 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,4 +1,4 @@ -name: CI AND IT +name: CI on: pull_request: diff --git a/Cargo.lock b/Cargo.lock index afa77c2..dee751b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,12 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. [[package]] +name = "base64" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" + +[[package]] name = "c2-chacha" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -83,6 +89,7 @@ dependencies = [ name = "skywalking-core" version = "0.1.0" dependencies = [ + "base64", "rand", ] diff --git a/tracing-core/Cargo.toml b/tracing-core/Cargo.toml index 6751303..821e4ef 100644 --- a/tracing-core/Cargo.toml +++ b/tracing-core/Cargo.toml @@ -7,4 +7,5 @@ description = "SkyWalking tracing core APIs. Provide the way to build SkyWalking license = "Apache 2.0" [dependencies] -rand = "0.7.3" \ No newline at end of file +rand = "0.7.3" +base64 = "0.11.0" \ No newline at end of file diff --git a/tracing-core/src/context.rs b/tracing-core/src/context.rs index 5817110..6525c04 100644 --- a/tracing-core/src/context.rs +++ b/tracing-core/src/context.rs @@ -13,17 +13,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use base64::{decode, encode}; + use crate::{ContextListener, ID, Span}; +use crate::context_carrier::{Extractable, Injectable}; use crate::id::IDGenerator; +use crate::segment_ref::SegmentRef; use crate::span::TracingSpan; /// Context represents the context of a tracing process. /// All new span belonging to this tracing context should be created through this context. pub trait Context { /// Create an entry span belonging this context - fn create_entry_span(&mut self, operation_name: String, parent: Option<&Box<dyn Span>>) -> Box<dyn Span>; + fn create_entry_span(&mut self, operation_name: String, parent: Option<&Box<dyn Span>>, extractor: &dyn Extractable) -> Box<dyn Span>; /// Create an exit span belonging this context - fn create_exit_span(&mut self, operation_name: String, parent: Option<&Box<dyn Span>>) -> Box<dyn Span>; + fn create_exit_span(&mut self, operation_name: String, parent: Option<&Box<dyn Span>>, peer: String, injector: &dyn Injectable) -> Box<dyn Span>; /// Create an local span belonging this context fn create_local_span(&mut self, operation_name: String, parent: Option<&Box<dyn Span>>) -> Box<dyn Span>; /// Finish the given span. The span is only being accept if it belongs to this context. @@ -67,25 +71,39 @@ impl TracingContext { /// Default implementation of Context impl Context for TracingContext { - fn create_entry_span(&mut self, operation_name: String, parent: Option<&Box<dyn Span>>) -> Box<dyn Span> { - TracingSpan::new_entry_span(operation_name, self.next_span_id(), match parent { + fn create_entry_span(&mut self, operation_name: String, parent: Option<&Box<dyn Span>>, extractor: &dyn Extractable) -> Box<dyn Span> { + let mut entry_span = TracingSpan::new_entry_span(operation_name, self.next_span_id(), match parent { None => { -1 } Some(s) => { s.span_id() } - }) + }); + match SegmentRef::from_text(extractor.extract("sw6".to_string())) { + Some(reference) => { + if self.self_generated_id { + self.self_generated_id = false; + self.primary_trace_id = reference.get_trace_id(); + } + entry_span._add_ref(reference); + } + _ => {} + } + Box::new(entry_span) } - fn create_exit_span(&mut self, operation_name: String, parent: Option<&Box<dyn Span>>) -> Box<dyn Span> { - TracingSpan::new_exit_span(operation_name, self.next_span_id(), match parent { + fn create_exit_span(&mut self, operation_name: String, parent: Option<&Box<dyn Span>>, peer: String, injector: &dyn Injectable) -> Box<dyn Span> { + let exit_span = TracingSpan::new_exit_span(operation_name, self.next_span_id(), match parent { None => { -1 } Some(s) => { s.span_id() } - }) + }, peer); + + + Box::new(exit_span) } fn create_local_span(&mut self, operation_name: String, parent: Option<&Box<dyn Span>>) -> Box<dyn Span> { - TracingSpan::new_local_span(operation_name, self.next_span_id(), match parent { + Box::new(TracingSpan::new_local_span(operation_name, self.next_span_id(), match parent { None => { -1 } Some(s) => { s.span_id() } - }) + })) } fn finish_span(&mut self, mut span: Box<dyn Span>) { @@ -101,20 +119,20 @@ mod context_tests { use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; - use crate::{Context, ContextListener, Tag, TracingContext}; + use crate::{Context, ContextListener, Extractable, ID, Tag, TracingContext}; #[test] fn test_context_stack() { let reporter = MockReporter::new(); let mut context = TracingContext::new(&reporter).unwrap(); - let span1 = context.create_entry_span(String::from("op1"), None); + let span1 = context.create_entry_span(String::from("op1"), None, &MockerHeader {}); { assert_eq!(span1.span_id(), 0); - let mut span2 = context.create_entry_span(String::from("op2"), Some(&span1)); + let mut span2 = context.create_local_span(String::from("op2"), Some(&span1)); span2.tag(Tag::new(String::from("tag1"), String::from("value1"))); { assert_eq!(span2.span_id(), 1); - let mut span3 = context.create_entry_span(String::from("op3"), Some(&span2)); + let mut span3 = context.create_local_span(String::from("op3"), Some(&span2)); assert_eq!(span3.span_id(), 2); context.finish_span(span3); @@ -127,6 +145,7 @@ mod context_tests { // context has moved into reporter. Can't be used again. let received_context = reporter.recv.recv().unwrap(); + assert_eq!(received_context.primary_trace_id == ID::new(3, 4, 5), true); assert_eq!(received_context.finished_spans.len(), 3); } @@ -161,6 +180,14 @@ mod context_tests { } } + struct MockerHeader {} + + impl Extractable for MockerHeader { + fn extract(&self, key: String) -> &str { + "1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw-Iy9wb3J0YWw=-MTIz" + } + } + struct MockRegisterPending {} impl ContextListener for MockRegisterPending { diff --git a/tracing-core/src/context_carrier.rs b/tracing-core/src/context_carrier.rs index b56a2c3..3ab607d 100644 --- a/tracing-core/src/context_carrier.rs +++ b/tracing-core/src/context_carrier.rs @@ -14,10 +14,16 @@ // limitations under the License. /// The Injectable implementation supports inject the give key/value for further propagation, -/// especially in across process propagation. /// Such as putting a key/value into the HTTP header. pub trait Injectable { /// Inject the given key/value into the implementation. /// The way of injection is determined by the implementation, no panic! should happens even injection fails. - fn inject(key: String, value: String); + fn inject(&self, key: String, value: String); +} + +/// The Extractable implementations extract propagated context out the implementation. +/// Such as fetching the key/value from the HTTP header. +pub trait Extractable { + /// Fetch the value by the given key. + fn extract(&self, key: String) -> &str; } \ No newline at end of file diff --git a/tracing-core/src/id.rs b/tracing-core/src/id.rs index 7f31724..b276986 100644 --- a/tracing-core/src/id.rs +++ b/tracing-core/src/id.rs @@ -15,6 +15,7 @@ use std::hash::Hash; use std::time::SystemTime; + use rand::RngCore; pub struct IDGenerator {} @@ -53,18 +54,18 @@ impl ID { /// Convert the literal string text back to ID object. /// Return Option::None if the text is not combined by 3 dot split i64 parts - pub fn from(id_text: String) -> Option<Self> { + pub fn from(id_text: String) -> Result<Self, String> { let strings: Vec<&str> = id_text.split(".").collect(); if strings.len() == 3 { let part1 = strings[0].parse::<i64>(); - if part1.is_err() { return None; } + if part1.is_err() { return Err("part 1 is not a i64".to_string()); } let part2 = strings[1].parse::<i64>(); - if part2.is_err() { return None; } + if part2.is_err() { return Err("part 2 is not a i64".to_string()); } let part3 = strings[2].parse::<i64>(); - if part3.is_err() { return None; } - Some(ID::new(part1.unwrap(), part2.unwrap(), part3.unwrap())) + if part3.is_err() { return Err("part 3 is not a i64".to_string()); } + Ok(ID::new(part1.unwrap(), part2.unwrap(), part3.unwrap())) } else { - None + Err("The ID is not combined by 3 parts.".to_string()) } } } @@ -107,9 +108,9 @@ mod id_tests { assert_eq!(id4.eq(&id1), true); let id5_none = ID::from(String::from("1.2")); - assert_eq!(id5_none == None, true); + assert_ne!(id5_none.err().unwrap().len(), 0); let id6_illegal = ID::from(String::from("1.2.a")); - assert_eq!(id6_illegal == None, true); + assert_ne!(id6_illegal.err().unwrap().len(), 0); } } \ No newline at end of file diff --git a/tracing-core/src/lib.rs b/tracing-core/src/lib.rs index 457c326..7d9a2d1 100644 --- a/tracing-core/src/lib.rs +++ b/tracing-core/src/lib.rs @@ -15,6 +15,8 @@ pub use context::Context; pub use context::TracingContext; +pub use context_carrier::Extractable; +pub use context_carrier::Injectable; pub use context_listener::ContextListener; pub use id::ID; pub use log::EventField; @@ -29,4 +31,5 @@ pub mod id; pub mod context_listener; pub mod log; pub mod context_carrier; +pub mod segment_ref; diff --git a/tracing-core/src/segment_ref.rs b/tracing-core/src/segment_ref.rs new file mode 100644 index 0000000..104060c --- /dev/null +++ b/tracing-core/src/segment_ref.rs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::ID; + +pub struct SegmentRef { + trace_id: ID, + segment_id: ID, + span_id: i32, + parent_service_instance_id: i32, + entry_service_instance_id: i32, + network_address: Option<String>, + network_address_id: i32, + entry_endpoint: Option<String>, + entry_endpoint_id: i32, + parent_endpoint: Option<String>, + parent_endpoint_id: i32, +} + +impl SegmentRef { + pub fn from_text(value: &str) -> Option<Self> { + let strings: Vec<&str> = value.split("-").collect(); + if strings.len() == 9 { + // Ignore string[0]. + let trace_id = match SegmentRef::string_to_id(strings[1]) { + Some(id) => { id } + _ => { return None; } + }; + let segment_id = match SegmentRef::string_to_id(strings[2]) { + Some(id) => { id } + _ => { return None; } + }; + let span_id = match strings[3].parse::<i32>() { + Ok(id) => { id } + _ => { return None; } + }; + let parent_service_instance_id = match strings[4].parse::<i32>() { + Ok(id) => { id } + _ => { return None; } + }; + let entry_service_instance_id = match strings[5].parse::<i32>() { + Ok(id) => { id } + _ => { return None; } + }; + + let (network_address, network_address_id) = match SegmentRef::decode_base64_to_string_or_id(strings[6]) { + Some(decoded) => { decoded } + _ => { return None; } + }; + let (entry_endpoint, entry_endpoint_id) = match SegmentRef::decode_base64_to_string_or_id(strings[7]) { + Some(decoded) => { decoded } + _ => { return None; } + }; + let (parent_endpoint, parent_endpoint_id) = match SegmentRef::decode_base64_to_string_or_id(strings[8]) { + Some(decoded) => { decoded } + _ => { return None; } + }; + + Some(SegmentRef { + trace_id, + segment_id, + span_id, + parent_service_instance_id, + entry_service_instance_id, + network_address, + network_address_id, + entry_endpoint, + entry_endpoint_id, + parent_endpoint, + parent_endpoint_id, + }) + } else { + None + } + } + + pub fn get_trace_id(&self) -> ID { + self.trace_id.clone() + } + + fn string_to_id(text: &str) -> Option<ID> { + match base64::decode(text) { + Ok(value) => { + match String::from_utf8(value) { + Ok(str) => { + match ID::from(str) { + Ok(id) => { Some(id) } + _ => None + } + } + _ => { None } + } + } + _ => { None } + } + } + + fn decode_base64_to_string_or_id(text: &str) -> Option<(Option<String>, i32)> { + match base64::decode(text) { + Ok(value) => { + match String::from_utf8(value) { + Ok(str) => { + if str.starts_with("#") { + let network: Vec<&str> = str.split("#").collect(); + (Some((Some(network[1].to_string()), 0))) + } else { + match str.parse::<i32>() { + Ok(id) => { Some((None, id)) } + _ => { None } + } + } + } + _ => { None } + } + } + _ => { None } + } + } +} + +#[cfg(test)] +mod segment_ref_tests { + use crate::ID; + use crate::segment_ref::SegmentRef; + + #[test] + fn test_deserialize_context_carrier() { + let carrier = SegmentRef::from_text("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw-Iy9wb3J0YWw=-MTIz").unwrap(); + assert_eq!(carrier.trace_id == ID::new(3, 4, 5), true); + assert_eq!(carrier.segment_id == ID::new(1, 2, 3), true); + assert_eq!(carrier.span_id, 4); + assert_eq!(carrier.entry_service_instance_id, 1); + assert_eq!(carrier.parent_service_instance_id, 1); + assert_eq!(carrier.network_address, Some("127.0.0.1:8080".to_string())); + assert_eq!(carrier.entry_endpoint, Some("/portal".to_string())); + assert_eq!(carrier.parent_endpoint_id, 123); + } +} diff --git a/tracing-core/src/span.rs b/tracing-core/src/span.rs index 9829d7d..623cdc6 100644 --- a/tracing-core/src/span.rs +++ b/tracing-core/src/span.rs @@ -16,6 +16,7 @@ use std::time::SystemTime; use crate::log::LogEvent; +use crate::segment_ref::SegmentRef; use crate::Tag; /// Span is one of the tracing concept, representing a time duration. @@ -79,31 +80,33 @@ pub struct TracingSpan { component_id: Option<i32>, tags: Vec<Tag>, logs: Vec<LogEvent>, + refs: Vec<SegmentRef>, } /// Tracing Span is only created inside TracingContext. impl TracingSpan { /// Create a new entry span - pub fn new_entry_span(operation_name: String, span_id: i32, parent_span_id: i32) -> Box<dyn Span> { + pub fn new_entry_span(operation_name: String, span_id: i32, parent_span_id: i32) -> TracingSpan { let mut span = TracingSpan::_new(operation_name, span_id, parent_span_id); span.is_entry = true; - Box::new(span) + span } /// Create a new exit span - pub fn new_exit_span(operation_name: String, span_id: i32, parent_span_id: i32) -> Box<dyn Span> { + pub fn new_exit_span(operation_name: String, span_id: i32, parent_span_id: i32, peer: String) -> TracingSpan { let mut span = TracingSpan::_new(operation_name, span_id, parent_span_id); span.is_exit = true; - Box::new(span) + span.peer = Some(peer); + span } /// Create a new local span - pub fn new_local_span(operation_name: String, span_id: i32, parent_span_id: i32) -> Box<dyn Span> { + pub fn new_local_span(operation_name: String, span_id: i32, parent_span_id: i32) -> TracingSpan { let span = TracingSpan::_new(operation_name, span_id, parent_span_id); - Box::new(span) + span } - /// Create a span and set the limited internal values + /// Create a span fn _new(operation_name: String, span_id: i32, parent_span_id: i32) -> Self { TracingSpan { operation_name, @@ -118,8 +121,13 @@ impl TracingSpan { component_id: None, tags: Vec::new(), logs: Vec::new(), + refs: Vec::new(), } } + + pub fn _add_ref(&mut self, reference: SegmentRef) { + self.refs.push(reference); + } } impl Span for TracingSpan {
