This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git
The following commit(s) were added to refs/heads/master by this push:
new 4c61a5b Add `missing_docs` lint and supply documents. (#49)
4c61a5b is described below
commit 4c61a5b03ea6560022c83ba96a5353e99ffa0fcb
Author: jmjoy <[email protected]>
AuthorDate: Thu Sep 22 13:32:09 2022 +0800
Add `missing_docs` lint and supply documents. (#49)
---
e2e/src/main.rs | 4 ++--
src/common/mod.rs | 2 ++
src/common/random_generator.rs | 4 ++++
src/common/system_time.rs | 2 ++
src/error/mod.rs | 8 +++++++
src/lib.rs | 2 +-
src/logging/logger.rs | 12 ++++++++---
src/logging/mod.rs | 2 ++
src/logging/record.rs | 23 +++++++++++++++++++-
src/management/instance.rs | 14 +++++++++++++
src/management/manager.rs | 8 +++++++
src/management/mod.rs | 2 ++
src/metrics/meter.rs | 24 +++++++++++++++++++++
src/metrics/metricer.rs | 12 +++++++++++
src/metrics/mod.rs | 2 ++
src/reporter/grpc.rs | 45 +++++++++++++++++++++++++++++-----------
src/reporter/mod.rs | 9 ++++++++
src/reporter/print.rs | 6 ++++++
src/skywalking_proto/mod.rs | 3 +++
src/skywalking_proto/v3/mod.rs | 5 +++++
src/trace/mod.rs | 2 ++
src/trace/propagation/context.rs | 5 +++++
src/trace/propagation/decoder.rs | 2 ++
src/trace/propagation/encoder.rs | 2 ++
src/trace/propagation/mod.rs | 2 ++
src/trace/span.rs | 6 ++++++
src/trace/trace_context.rs | 40 ++++++++++++++++++++++++-----------
src/trace/tracer.rs | 12 +++++++----
28 files changed, 225 insertions(+), 35 deletions(-)
diff --git a/e2e/src/main.rs b/e2e/src/main.rs
index dc88364..545c1c9 100644
--- a/e2e/src/main.rs
+++ b/e2e/src/main.rs
@@ -91,7 +91,7 @@ async fn handle_ping(
.await
.unwrap();
}
- Ok(Response::new(Body::from("hoge")))
+ Ok(Response::new(Body::from("ok")))
}
async fn producer_response(
@@ -147,7 +147,7 @@ async fn handle_pong(_req: Request<Body>) ->
Result<Response<Body>, Infallible>
.unwrap();
let mut context = tracer::create_trace_context();
let _span = context.create_entry_span_with_propagation("/pong", &ctx);
- Ok(Response::new(Body::from("hoge")))
+ Ok(Response::new(Body::from("ok")))
}
async fn consumer_response(_req: Request<Body>) -> Result<Response<Body>,
Infallible> {
diff --git a/src/common/mod.rs b/src/common/mod.rs
index 7c59317..e602fce 100644
--- a/src/common/mod.rs
+++ b/src/common/mod.rs
@@ -14,5 +14,7 @@
// limitations under the License.
//
+//! Common utils.
+
pub mod random_generator;
pub(crate) mod system_time;
diff --git a/src/common/random_generator.rs b/src/common/random_generator.rs
index 6c64f5b..6025fac 100644
--- a/src/common/random_generator.rs
+++ b/src/common/random_generator.rs
@@ -14,11 +14,15 @@
// limitations under the License.
//
+//! Random id generator.
+
use uuid::Uuid;
+/// Random id generator.
pub struct RandomGenerator;
impl RandomGenerator {
+ /// Generate unique id as string.
pub fn generate() -> String {
Uuid::new_v4().as_u128().to_string()
}
diff --git a/src/common/system_time.rs b/src/common/system_time.rs
index 6513e81..8dd48d4 100644
--- a/src/common/system_time.rs
+++ b/src/common/system_time.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Get system time, as mills seconds.
+
use cfg_if::cfg_if;
pub(crate) enum TimePeriod {
diff --git a/src/error/mod.rs b/src/error/mod.rs
index 406a240..2a26da6 100644
--- a/src/error/mod.rs
+++ b/src/error/mod.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Crate errors.
+
pub(crate) const LOCK_MSG: &str = "should not cross threads/coroutines
(locked)";
/// Skywalking Result.
@@ -22,21 +24,27 @@ pub type Result<T> = std::result::Result<T, Error>;
/// Skywalking Error.
#[derive(Debug, thiserror::Error)]
pub enum Error {
+ /// Decode propagation failed.
#[error("decode propagation failed: {0}")]
DecodePropagation(&'static str),
+ /// Reporter shutdown failed.
#[error("reporter shutdown failed: {0}")]
ReporterShutdown(String),
+ /// Tonic transport failed.
#[error("tonic transport failed: {0}")]
TonicTransport(#[from] tonic::transport::Error),
+ /// Tonic status error.
#[error("tonic status: {0}")]
TonicStatus(#[from] tonic::Status),
+ /// Tokio join failed.
#[error("tokio join failed: {0}")]
TokioJoin(#[from] tokio::task::JoinError),
+ /// Other uncovered errors.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + 'static>),
}
diff --git a/src/lib.rs b/src/lib.rs
index 65999d9..75e8a49 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
-#![warn(rust_2018_idioms)]
+#![warn(rust_2018_idioms, missing_docs)]
#![warn(clippy::dbg_macro, clippy::print_stdout)]
#![doc = include_str!("../README.md")]
#![cfg_attr(docsrs, feature(doc_cfg))]
diff --git a/src/logging/logger.rs b/src/logging/logger.rs
index a831058..2ee9045 100644
--- a/src/logging/logger.rs
+++ b/src/logging/logger.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Logger and global methods.
+
use super::record::LogRecord;
use crate::reporter::{CollectItem, DynReport, Report};
use std::sync::Arc;
@@ -24,13 +26,13 @@ static GLOBAL_LOGGER: OnceCell<Logger> =
OnceCell::const_new();
/// Set the global logger.
pub fn set_global_logger(logger: Logger) {
if GLOBAL_LOGGER.set(logger).is_err() {
- panic!("global logger has setted")
+ panic!("global logger has set")
}
}
/// Get the global logger.
pub fn global_logger() -> &'static Logger {
- GLOBAL_LOGGER.get().expect("global logger haven't setted")
+ GLOBAL_LOGGER.get().expect("global logger haven't set")
}
/// Log by global logger.
@@ -38,12 +40,13 @@ pub fn log(record: LogRecord) {
global_logger().log(record);
}
-pub struct Inner {
+struct Inner {
service_name: String,
instance_name: String,
reporter: Box<DynReport>,
}
+/// Logger handles skywalking logging operations, integrate with reporter.
#[derive(Clone)]
pub struct Logger {
inner: Arc<Inner>,
@@ -65,14 +68,17 @@ impl Logger {
}
}
+ /// Get service name.
pub fn service_name(&self) -> &str {
&self.inner.service_name
}
+ /// Get instance name.
pub fn instance_name(&self) -> &str {
&self.inner.instance_name
}
+ /// Do logging via reporter.
pub fn log(&self, record: LogRecord) {
let data = record.convert_to_log_data(
self.service_name().to_owned(),
diff --git a/src/logging/mod.rs b/src/logging/mod.rs
index 020363b..3df18bd 100644
--- a/src/logging/mod.rs
+++ b/src/logging/mod.rs
@@ -14,5 +14,7 @@
// limitations under the License.
//
+//! Skywalking logging api.
+
pub mod logger;
pub mod record;
diff --git a/src/logging/record.rs b/src/logging/record.rs
index 02b5e81..3e1ce91 100644
--- a/src/logging/record.rs
+++ b/src/logging/record.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Log record items.
+
use crate::{
common::system_time::{fetch_time, TimePeriod},
skywalking_proto::v3::{
@@ -24,9 +26,13 @@ use crate::{
};
use std::time::{SystemTime, UNIX_EPOCH};
+/// Log record type of [LogRecord];
pub enum RecordType {
+ /// Text type.
Text,
+ /// Json type.
Json,
+ /// Yaml type.
Yaml,
}
@@ -36,6 +42,7 @@ impl Default for RecordType {
}
}
+/// The builder of [LogData];
#[derive(Default)]
pub struct LogRecord {
time: Option<SystemTime>,
@@ -50,34 +57,43 @@ pub struct LogRecord {
}
impl LogRecord {
+ /// New default [LogRecord];
#[inline]
pub fn new() -> Self {
Default::default()
}
+ /// Use custom time rather than now time.
#[inline]
- pub fn custome_time(mut self, time: SystemTime) -> Self {
+ pub fn custom_time(mut self, time: SystemTime) -> Self {
self.time = Some(time);
self
}
+ /// Not set the log time, OAP server would use the received timestamp as
+ /// log's timestamp, or relies on the OAP server analyzer.
#[inline]
pub fn ignore_time(mut self) -> Self {
self.is_ignore_time = true;
self
}
+ /// The logic name represents the endpoint, which logs belong.
#[inline]
pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = endpoint.into();
self
}
+ /// The available tags. OAP server could provide search/analysis
+ /// capabilities based on these.
pub fn add_tag(mut self, key: impl Into<String>, value: impl Into<String>)
-> Self {
self.tags.push((key.into(), value.into()));
self
}
+ /// The available tags. OAP server could provide search/analysis
+ /// capabilities based on these.
pub fn add_tags<K, V, I>(mut self, tags: I) -> Self
where
K: Into<String>,
@@ -89,22 +105,27 @@ impl LogRecord {
self
}
+ /// Logs with trace context.
pub fn with_tracing_context(mut self, tracing_context: &TracingContext) ->
Self {
self.trace_id = Some(tracing_context.trace_id().to_owned());
self.trace_segment_id =
Some(tracing_context.trace_segment_id().to_owned());
self
}
+ /// The span should be unique in the whole segment.
pub fn with_span(mut self, span: &Span) -> Self {
self.span_id = Some(span.with_span_object(|span| span.span_id));
self
}
+ /// A type to match analyzer(s) at the OAP server.
+ /// The data could be analyzed at the client side, but could be partial.
pub fn record_type(mut self, record_type: RecordType) -> Self {
self.record_type = record_type;
self
}
+ /// Log content.
pub fn content(mut self, content: impl Into<String>) -> Self {
self.content = content.into();
self
diff --git a/src/management/instance.rs b/src/management/instance.rs
index 6b72f02..7950a19 100644
--- a/src/management/instance.rs
+++ b/src/management/instance.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Skywalking report instance properties items.
+
use crate::skywalking_proto::v3::{InstanceProperties, KeyStringValuePair};
use once_cell::sync::Lazy;
use std::{collections::HashMap, process};
@@ -58,39 +60,51 @@ const OS_NAME: Option<&str> = if cfg!(target_os = "linux") {
None
};
+/// Builder of [InstanceProperties].
#[derive(Debug, Default)]
pub struct Properties {
inner: HashMap<String, Vec<String>>,
}
impl Properties {
+ /// Instance properties key of host name.
pub const KEY_HOST_NAME: &'static str = "hostname";
+ /// Instance properties key of ipv4.
pub const KEY_IPV4: &'static str = "ipv4";
+ /// Instance properties key of programming language.
pub const KEY_LANGUAGE: &'static str = "language";
+ /// Instance properties key of os name.
pub const KEY_OS_NAME: &'static str = "OS Name";
+ /// Instance properties key of process number.
pub const KEY_PROCESS_NO: &'static str = "Process No.";
}
impl Properties {
+ /// New empty properties.
#[inline]
pub fn new() -> Self {
Default::default()
}
+ /// Insert key value pair, will not overwrite the original, because
multiple
+ /// values of the same key can exist.
pub fn insert(&mut self, key: impl Into<String>, value: impl Into<String>)
{
self.inner.entry(key.into()).or_default().push(value.into());
}
+ /// Overwrite the values, whether there are multiple.
pub fn update(&mut self, key: &str, value: impl Into<String>) {
if let Some(values) = self.inner.get_mut(key) {
*values = vec![value.into()];
}
}
+ /// Remove all values associated the key.
pub fn remove(&mut self, key: &str) {
self.inner.remove(key);
}
+ /// Insert the OS system info, such as os name, host name, etc.
pub fn insert_os_info(&mut self) {
for (key, value) in build_os_info() {
self.insert(key, value);
diff --git a/src/management/manager.rs b/src/management/manager.rs
index fe45f79..9641c35 100644
--- a/src/management/manager.rs
+++ b/src/management/manager.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Manager methods.
+
use super::instance::Properties;
use crate::reporter::{CollectItem, DynReport, Report};
use std::{
@@ -29,6 +31,7 @@ use tokio::{
time,
};
+/// Manager handles skywalking management operations, integrate with reporter.
pub struct Manager {
service_name: String,
instance_name: String,
@@ -49,20 +52,24 @@ impl Manager {
}
}
+ /// Get service name.
pub fn service_name(&self) -> &str {
&self.service_name
}
+ /// Get instance name.
pub fn instance_name(&self) -> &str {
&self.instance_name
}
+ /// Report instance properties.
pub fn report_properties(&self, properties: Properties) {
let props = properties
.convert_to_instance_properties(self.service_name.clone(),
self.instance_name.clone());
self.reporter.report(CollectItem::Instance(Box::new(props)));
}
+ /// Do keep alive (heartbeat), with the interval, will be run in
background.
pub fn keep_alive(&self, interval: Duration) -> KeepAlive {
let service_name = self.service_name.clone();
let instance_name = self.instance_name.clone();
@@ -85,6 +92,7 @@ impl Manager {
}
}
+/// Handle of [Manager::keep_alive].
pub struct KeepAlive {
handle: JoinHandle<()>,
}
diff --git a/src/management/mod.rs b/src/management/mod.rs
index 831bad1..93efee1 100644
--- a/src/management/mod.rs
+++ b/src/management/mod.rs
@@ -14,5 +14,7 @@
// limitations under the License.
//
+//! Skywalking management api.
+
pub mod instance;
pub mod manager;
diff --git a/src/metrics/meter.rs b/src/metrics/meter.rs
index 2ccf401..230735c 100644
--- a/src/metrics/meter.rs
+++ b/src/metrics/meter.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Meter with multiple types.
+
use crate::{
common::system_time::{fetch_time, TimePeriod},
metrics::metricer::Metricer,
@@ -27,12 +29,16 @@ use std::{
sync::atomic::{AtomicI64, Ordering},
};
+/// Transform to [MeterData].
pub trait Transform: Send + Sync {
+ /// Get the meter id.
fn meter_id(&self) -> MeterId;
+ /// Transform to [MeterData].
fn transform(&self, metricer: &Metricer) -> MeterData;
}
+/// Predefine meter type.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) enum MeterType {
Counter,
@@ -40,6 +46,7 @@ pub(crate) enum MeterType {
Histogram,
}
+/// Unique meter id for metrics.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct MeterId {
name: String,
@@ -75,6 +82,7 @@ pub enum CounterMode {
RATE,
}
+/// Meter with type `Counter`.
pub struct Counter {
id: MeterId,
mode: CounterMode,
@@ -83,6 +91,7 @@ pub struct Counter {
}
impl Counter {
+ /// New meter with type `Counter`.
#[inline]
pub fn new(name: impl Into<String>) -> Self {
Self {
@@ -97,12 +106,14 @@ impl Counter {
}
}
+ /// Add label.
#[inline]
pub fn add_label(mut self, key: impl Into<String>, value: impl
Into<String>) -> Self {
self.id = self.id.add_label(key, value);
self
}
+ /// Add labels.
#[inline]
pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
where
@@ -114,16 +125,19 @@ impl Counter {
self
}
+ /// Set counter mode.
#[inline]
pub fn mode(mut self, mode: CounterMode) -> Self {
self.mode = mode;
self
}
+ /// Increment meter value by count.
pub fn increment(&self, count: f64) {
self.count.fetch_add(count, Ordering::Acquire);
}
+ /// Get meter value.
pub fn get(&self) -> f64 {
self.count.load(Ordering::Acquire)
}
@@ -164,12 +178,14 @@ impl Transform for Counter {
}
}
+/// Meter with type `Gauge`.
pub struct Gauge<G> {
id: MeterId,
getter: G,
}
impl<G: Fn() -> f64> Gauge<G> {
+ /// New meter with type `Gauge` and getter.
#[inline]
pub fn new(name: impl Into<String>, getter: G) -> Self {
Self {
@@ -182,12 +198,14 @@ impl<G: Fn() -> f64> Gauge<G> {
}
}
+ /// Add label.
#[inline]
pub fn add_label(mut self, key: impl Into<String>, value: impl
Into<String>) -> Self {
self.id = self.id.add_label(key, value);
self
}
+ /// Add labels.
#[inline]
pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
where
@@ -199,6 +217,7 @@ impl<G: Fn() -> f64> Gauge<G> {
self
}
+ /// Get the meter value by calling getter.
pub fn get(&self) -> f64 {
(self.getter)()
}
@@ -245,12 +264,14 @@ impl Bucket {
}
}
+/// Meter with type `Histogram`.
pub struct Histogram {
id: MeterId,
buckets: Vec<Bucket>,
}
impl Histogram {
+ /// New meter with type `Histogram`.
pub fn new(name: impl Into<String>, mut steps: Vec<f64>) -> Self {
Self {
id: MeterId {
@@ -266,12 +287,14 @@ impl Histogram {
}
}
+ /// Add label.
#[inline]
pub fn add_label(mut self, key: impl Into<String>, value: impl
Into<String>) -> Self {
self.id = self.id.add_label(key, value);
self
}
+ /// Add labels.
#[inline]
pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
where
@@ -283,6 +306,7 @@ impl Histogram {
self
}
+ /// Increment meter value by the bucket owned the value.
pub fn add_value(&self, value: f64) {
if let Some(index) = self.find_bucket(value) {
self.buckets[index].count.fetch_add(1, Ordering::Acquire);
diff --git a/src/metrics/metricer.rs b/src/metrics/metricer.rs
index ad8f3de..76c5e6a 100644
--- a/src/metrics/metricer.rs
+++ b/src/metrics/metricer.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Metricer methods.
+
use super::meter::{MeterId, Transform};
use crate::reporter::{CollectItem, DynReport, Report};
use std::{
@@ -31,6 +33,8 @@ use tokio::{
time::interval,
};
+/// Metricer handles skywalking metrics operations, integrate with reporter,
can
+/// be register with multiple [Transform].
pub struct Metricer {
service_name: String,
instance_name: String,
@@ -55,18 +59,22 @@ impl Metricer {
}
}
+ /// Get service name.
pub fn service_name(&self) -> &str {
&self.service_name
}
+ /// Get instance name.
pub fn instance_name(&self) -> &str {
&self.instance_name
}
+ /// Set report interval, default is 20s.
pub fn set_report_interval(&mut self, report_interval: Duration) {
self.report_interval = report_interval;
}
+ /// Register new [Transform], and return it with [Arc] wrap.
pub fn register<T: Transform + 'static>(&mut self, transform: T) -> Arc<T>
{
let transform = Arc::new(transform);
self.meter_map
@@ -74,6 +82,8 @@ impl Metricer {
transform
}
+ /// Boot the reporting with the report interval previous set, will be run
in
+ /// background.
pub fn boot(self) -> Booting {
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
@@ -104,12 +114,14 @@ impl Metricer {
}
}
+/// handle of [Metricer::boot].
pub struct Booting {
handle: JoinHandle<()>,
shutdown_tx: mpsc::Sender<()>,
}
impl Booting {
+ /// Shutdown the metrics reporting.
pub async fn shutdown(self) -> crate::Result<()> {
self.shutdown_tx.send(()).await.unwrap();
Ok(self.await?)
diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs
index 3c67e08..3883d7d 100644
--- a/src/metrics/mod.rs
+++ b/src/metrics/mod.rs
@@ -14,5 +14,7 @@
// limitations under the License.
//
+//! Skywalking metrics api.
+
pub mod meter;
pub mod metricer;
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index 5b0843a..2ac1f00 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Grpc implementation of [Report].
+
#[cfg(feature = "management")]
use
crate::skywalking_proto::v3::management_service_client::ManagementServiceClient;
use crate::{
@@ -49,7 +51,10 @@ use tonic::{
transport::{self, Channel, Endpoint},
};
+/// Special purpose, used for user-defined production operations. Generally, it
+/// does not need to be handled.
pub trait CollectItemProduce: Send + Sync + 'static {
+ /// Produce the collect item non-blocking.
fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>>;
}
@@ -65,15 +70,19 @@ impl CollectItemProduce for
mpsc::UnboundedSender<CollectItem> {
}
}
+/// Special purpose, used for user-defined consume operations. Generally, it
+/// does not need to be handled.
#[async_trait]
-pub trait ColletcItemConsume: Send + Sync + 'static {
+pub trait CollectItemConsume: Send + Sync + 'static {
+ /// Consume the collect item blocking.
async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error +
Send>>;
+ /// Try to consume the collect item non-blocking.
async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn
Error + Send>>;
}
#[async_trait]
-impl ColletcItemConsume for () {
+impl CollectItemConsume for () {
async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error +
Send>> {
Ok(None)
}
@@ -84,7 +93,7 @@ impl ColletcItemConsume for () {
}
#[async_trait]
-impl ColletcItemConsume for mpsc::UnboundedReceiver<CollectItem> {
+impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error +
Send>> {
Ok(self.recv().await)
}
@@ -115,19 +124,23 @@ struct Inner<P, C> {
is_closed: AtomicBool,
}
+/// Alias of dyn [Error] callback.
pub type DynErrHandle = dyn Fn(Box<dyn Error>) + Send + Sync + 'static;
+/// Reporter which will report to Skywalking OAP server via grpc protocol.
pub struct GrpcReporter<P, C> {
inner: Arc<Inner<P, C>>,
err_handle: Arc<Option<Box<DynErrHandle>>>,
}
impl GrpcReporter<mpsc::UnboundedSender<CollectItem>,
mpsc::UnboundedReceiver<CollectItem>> {
+ /// New with exists [Channel], so you can clone the [Channel] for
multiplex.
pub fn new(channel: Channel) -> Self {
let (p, c) = mpsc::unbounded_channel();
Self::new_with_pc(channel, p, c)
}
+ /// Connect to the Skywalking OAP server.
pub async fn connect(
address: impl TryInto<Endpoint, Error = transport::Error>,
) -> crate::Result<Self> {
@@ -137,7 +150,9 @@ impl GrpcReporter<mpsc::UnboundedSender<CollectItem>,
mpsc::UnboundedReceiver<Co
}
}
-impl<P: CollectItemProduce, C: ColletcItemConsume> GrpcReporter<P, C> {
+impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
+ /// Special purpose, used for user-defined produce and consume operations,
+ /// usually you can use [GrpcReporter::connect] and [GrpcReporter::new].
pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
Self {
inner: Arc::new(Inner {
@@ -155,6 +170,7 @@ impl<P: CollectItemProduce, C: ColletcItemConsume>
GrpcReporter<P, C> {
}
}
+ /// Set error handle. By default, the error will not be handle.
pub fn with_err_handle(
mut self,
handle: impl Fn(Box<dyn Error>) + Send + Sync + 'static,
@@ -163,9 +179,7 @@ impl<P: CollectItemProduce, C: ColletcItemConsume>
GrpcReporter<P, C> {
self
}
- /// Start to reporting, quit when shutdown_signal received.
- ///
- /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
+ /// Start to reporting.
///
/// # Panics
///
@@ -198,7 +212,7 @@ impl<P, C> Clone for GrpcReporter<P, C> {
}
}
-impl<P: CollectItemProduce, C: ColletcItemConsume> Report for GrpcReporter<P,
C> {
+impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P,
C> {
fn report(&self, item: CollectItem) {
if !self.inner.is_closed.load(Ordering::Relaxed) {
if let Err(e) = self.inner.producer.produce(item) {
@@ -218,7 +232,7 @@ struct ReporterAndBuffer<P, C> {
meter_buffer: LinkedList<MeterData>,
}
-impl<P: CollectItemProduce, C: ColletcItemConsume> ReporterAndBuffer<P, C> {
+impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
async fn report(&mut self, item: CollectItem) {
// TODO Implement batch collect in future.
match item {
@@ -312,14 +326,17 @@ impl<P: CollectItemProduce, C: ColletcItemConsume>
ReporterAndBuffer<P, C> {
}
}
-/// Created by [GrpcReporter::reporting].
+/// Handle of [GrpcReporter::reporting].
pub struct Reporting<P, C> {
rb: ReporterAndBuffer<P, C>,
consumer: C,
shutdown_signal: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
}
-impl<P: CollectItemProduce, C: ColletcItemConsume> Reporting<P, C> {
+impl<P: CollectItemProduce, C: CollectItemConsume> Reporting<P, C> {
+ /// Quit when shutdown_signal received.
+ ///
+ /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
pub fn with_graceful_shutdown(
mut self,
shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
@@ -328,17 +345,20 @@ impl<P: CollectItemProduce, C: ColletcItemConsume>
Reporting<P, C> {
self
}
- pub fn with_staus_handle(mut self, handle: impl Fn(tonic::Status) + Send +
'static) -> Self {
+ /// Set the failed status handle. By default, the status will not be
handle.
+ pub fn with_status_handle(mut self, handle: impl Fn(tonic::Status) + Send
+ 'static) -> Self {
self.rb.status_handle = Some(Box::new(handle));
self
}
+ /// Spawn the reporting in background.
pub fn spawn(self) -> ReportingJoinHandle {
ReportingJoinHandle {
handle: tokio::spawn(self.start()),
}
}
+ /// Start the consume and report task.
pub async fn start(self) -> crate::Result<()> {
let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
let Reporting {
@@ -393,6 +413,7 @@ impl<P: CollectItemProduce, C: ColletcItemConsume>
Reporting<P, C> {
}
}
+/// Handle of [Reporting::spawn].
pub struct ReportingJoinHandle {
handle: JoinHandle<crate::Result<()>>,
}
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
index 921cb02..6d54f88 100644
--- a/src/reporter/mod.rs
+++ b/src/reporter/mod.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Reporter contains common `Report` trait and the implementations.
+
pub mod grpc;
pub mod print;
@@ -24,15 +26,21 @@ use serde::{Deserialize, Serialize};
use std::{ops::Deref, sync::Arc};
use tokio::sync::OnceCell;
+/// Collect item of protobuf object.
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub enum CollectItem {
+ /// Tracing object.
Trace(Box<SegmentObject>),
+ /// Log object.
Log(Box<LogData>),
+ /// Metric object.
Meter(Box<MeterData>),
+ /// Instance properties object.
#[cfg(feature = "management")]
#[cfg_attr(docsrs, doc(cfg(feature = "management")))]
Instance(Box<InstanceProperties>),
+ /// Keep alive object.
#[cfg(feature = "management")]
#[cfg_attr(docsrs, doc(cfg(feature = "management")))]
Ping(Box<InstancePingPkg>),
@@ -42,6 +50,7 @@ pub(crate) type DynReport = dyn Report + Send + Sync +
'static;
/// Report provide non-blocking report method for trace, log and metric object.
pub trait Report {
+ /// The non-blocking report method.
fn report(&self, item: CollectItem);
}
diff --git a/src/reporter/print.rs b/src/reporter/print.rs
index 6124c46..5b3fde0 100644
--- a/src/reporter/print.rs
+++ b/src/reporter/print.rs
@@ -14,19 +14,25 @@
// limitations under the License.
//
+//! Print implementation of [Report].
+
use crate::reporter::{CollectItem, Report};
+/// Reporter which just print the collect items, not actual report to server,
+/// for debug usage.
#[derive(Default, Clone)]
pub struct PrintReporter {
use_stderr: bool,
}
impl PrintReporter {
+ /// New with default.
#[inline]
pub fn new() -> Self {
Default::default()
}
+ /// If true, use ``eprint!` to print, otherwise use `print!`.
pub fn use_stderr(mut self, use_stderr: bool) -> Self {
self.use_stderr = use_stderr;
self
diff --git a/src/skywalking_proto/mod.rs b/src/skywalking_proto/mod.rs
index c3e1141..777d2eb 100644
--- a/src/skywalking_proto/mod.rs
+++ b/src/skywalking_proto/mod.rs
@@ -13,4 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
+
+//! Generated code of `skywalking-data-collect-protocol`, by `tonic`.
+
pub mod v3;
diff --git a/src/skywalking_proto/v3/mod.rs b/src/skywalking_proto/v3/mod.rs
index dc74965..32d517d 100644
--- a/src/skywalking_proto/v3/mod.rs
+++ b/src/skywalking_proto/v3/mod.rs
@@ -13,6 +13,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
+
+//! Generated code of `skywalking.v3`, by `tonic`.
+
+#![allow(missing_docs)]
+
use crate::common::system_time::{fetch_time, TimePeriod};
tonic::include_proto!("skywalking.v3");
diff --git a/src/trace/mod.rs b/src/trace/mod.rs
index c194875..a7e1f27 100644
--- a/src/trace/mod.rs
+++ b/src/trace/mod.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Skywalking trace api.
+
pub mod propagation;
pub mod span;
pub mod trace_context;
diff --git a/src/trace/propagation/context.rs b/src/trace/propagation/context.rs
index a4aab66..0faa109 100644
--- a/src/trace/propagation/context.rs
+++ b/src/trace/propagation/context.rs
@@ -14,8 +14,12 @@
// limitations under the License.
//
+//! Propagation context.
+
+/// The key of propagation header.
pub static SKYWALKING_HTTP_CONTEXT_HEADER_KEY: &str = "sw8";
+/// Propagation context.
#[derive(Debug)]
pub struct PropagationContext {
/// It defines whether next span should be trace or not.
@@ -52,6 +56,7 @@ pub struct PropagationContext {
/// In general, this context will be used if you create new TraceContext after
/// received decoded context that should be packed in `sw8` header.
impl PropagationContext {
+ /// New [PropagationContext].
#[allow(clippy::too_many_arguments)]
pub fn new(
do_sample: bool,
diff --git a/src/trace/propagation/decoder.rs b/src/trace/propagation/decoder.rs
index 6ab9fe6..3b45e67 100644
--- a/src/trace/propagation/decoder.rs
+++ b/src/trace/propagation/decoder.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Propagation decoder.
+
use crate::trace::propagation::context::PropagationContext;
use base64::decode;
diff --git a/src/trace/propagation/encoder.rs b/src/trace/propagation/encoder.rs
index a3a6e01..6d7b417 100644
--- a/src/trace/propagation/encoder.rs
+++ b/src/trace/propagation/encoder.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Propagation encoder.
+
use crate::trace::trace_context::TracingContext;
use base64::encode;
diff --git a/src/trace/propagation/mod.rs b/src/trace/propagation/mod.rs
index 6c522f3..4da780e 100644
--- a/src/trace/propagation/mod.rs
+++ b/src/trace/propagation/mod.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Cross process propagation.
+
pub mod context;
pub mod decoder;
pub mod encoder;
diff --git a/src/trace/span.rs b/src/trace/span.rs
index 84d97fa..16ecf79 100644
--- a/src/trace/span.rs
+++ b/src/trace/span.rs
@@ -14,6 +14,9 @@
// limitations under the License.
//
+//! Span is an important and common concept in distributed tracing system.
Learn
+//! Span from Google Dapper Paper.
+
use crate::{
common::system_time::{fetch_time, TimePeriod},
error::LOCK_MSG,
@@ -125,15 +128,18 @@ impl Span {
self.stack.upgrade().expect("Context has dropped")
}
+ /// Immutable with inner span object.
pub fn with_span_object<T>(&self, f: impl FnOnce(&SpanObject) -> T) -> T {
self.upgrade_stack()
.with_active(|stack| f(&stack[self.index]))
}
+ /// Mutable with inner span object.
pub fn with_span_object_mut<T>(&mut self, f: impl FnOnce(&mut SpanObject)
-> T) -> T {
f(&mut
(self.upgrade_stack().active.try_write().expect(LOCK_MSG))[self.index])
}
+ /// Get span id.
pub fn span_id(&self) -> i32 {
self.with_span_object(|span| span.span_id)
}
diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs
index d0daa94..b5161f4 100644
--- a/src/trace/trace_context.rs
+++ b/src/trace/trace_context.rs
@@ -14,6 +14,10 @@
// limitations under the License.
//
+//! TracingContext is the context of the tracing process. Span should only be
+//! created through context, and be archived into the context after the span
+//! finished.
+
use crate::{
common::{
random_generator::RandomGenerator,
@@ -37,18 +41,18 @@ use std::{
#[derive(Default)]
pub(crate) struct SpanStack {
- // TODO Swith to use `try_rwlock` instead of `RwLock` for better
performance.
- pub(crate) finialized: RwLock<Vec<SpanObject>>,
+ // TODO Switch to use `try_rwlock` instead of `RwLock` for better
performance.
+ pub(crate) finalized: RwLock<Vec<SpanObject>>,
pub(crate) active: RwLock<Vec<SpanObject>>,
}
impl SpanStack {
- pub(crate) fn with_finialized<T>(&self, f: impl FnOnce(&Vec<SpanObject>)
-> T) -> T {
- f(&self.finialized.try_read().expect(LOCK_MSG))
+ pub(crate) fn with_finalized<T>(&self, f: impl FnOnce(&Vec<SpanObject>) ->
T) -> T {
+ f(&self.finalized.try_read().expect(LOCK_MSG))
}
- pub(crate) fn with_finialized_mut<T>(&self, f: impl FnOnce(&mut
Vec<SpanObject>) -> T) -> T {
- f(&mut *self.finialized.try_write().expect(LOCK_MSG))
+ pub(crate) fn with_finalized_mut<T>(&self, f: impl FnOnce(&mut
Vec<SpanObject>) -> T) -> T {
+ f(&mut *self.finalized.try_write().expect(LOCK_MSG))
}
pub(crate) fn with_active<T>(&self, f: impl FnOnce(&Vec<SpanObject>) -> T)
-> T {
@@ -74,13 +78,16 @@ impl SpanStack {
let span = self.pop_active(index);
if let Some(mut span) = span {
span.end_time = fetch_time(TimePeriod::End);
- self.with_finialized_mut(|spans| spans.push(span));
+ self.with_finalized_mut(|spans| spans.push(span));
} else {
panic!("Finalize span isn't the active span");
}
}
}
+/// TracingContext is the context of the tracing process. Span should only be
+/// created through context, and be archived into the context after the span
+/// finished.
#[must_use = "call `create_entry_span` after `TracingContext` created."]
pub struct TracingContext {
trace_id: String,
@@ -103,8 +110,8 @@ impl std::fmt::Debug for TracingContext {
.field("service_instance", &self.service_instance)
.field("next_span_id", &self.next_span_id)
.field(
- "finialized_spans",
- match self.span_stack.finialized.try_read() {
+ "finalized_spans",
+ match self.span_stack.finalized.try_read() {
Ok(spans) => {
span_objects = spans.clone();
&span_objects
@@ -135,21 +142,25 @@ impl TracingContext {
}
}
+ /// Get trace id.
#[inline]
pub fn trace_id(&self) -> &str {
&self.trace_id
}
+ /// Get trace segment id.
#[inline]
pub fn trace_segment_id(&self) -> &str {
&self.trace_segment_id
}
+ /// Get service name.
#[inline]
pub fn service(&self) -> &str {
&self.service
}
+ /// Get service instance.
#[inline]
pub fn service_instance(&self) -> &str {
&self.service_instance
@@ -166,13 +177,14 @@ impl TracingContext {
span_id
}
+ /// Get the last finalized span.
pub fn last_span(&self) -> Option<SpanObject> {
self.span_stack
- .with_finialized(|spans| spans.last().cloned())
+ .with_finalized(|spans| spans.last().cloned())
}
fn with_spans_mut<T>(&mut self, f: impl FnOnce(&mut Vec<SpanObject>) -> T)
-> T {
- f(&mut *self.span_stack.finialized.try_write().expect(LOCK_MSG))
+ f(&mut *self.span_stack.finalized.try_write().expect(LOCK_MSG))
}
pub(crate) fn with_active_span_stack<T>(&self, f: impl
FnOnce(&Vec<SpanObject>) -> T) -> T {
@@ -334,7 +346,8 @@ impl TracingContext {
/// It converts tracing context into segment object.
/// This conversion should be done before sending segments into OAP.
///
- /// Notice: The spans will taked, so this method shouldn't be called twice.
+ /// Notice: The spans will be taken, so this method shouldn't be called
+ /// twice.
pub(crate) fn convert_to_segment_object(&mut self) -> SegmentObject {
let trace_id = self.trace_id().to_owned();
let trace_segment_id = self.trace_segment_id().to_owned();
@@ -380,6 +393,7 @@ impl Drop for TracingContext {
}
}
+/// Cross threads context snapshot.
#[derive(Debug)]
pub struct ContextSnapshot {
trace_id: String,
@@ -389,10 +403,12 @@ pub struct ContextSnapshot {
}
impl ContextSnapshot {
+ /// Check if the snapshot is created from current context.
pub fn is_from_current(&self, context: &TracingContext) -> bool {
!self.trace_segment_id.is_empty() && self.trace_segment_id ==
context.trace_segment_id()
}
+ /// Check if the snapshot is valid.
pub fn is_valid(&self) -> bool {
!self.trace_segment_id.is_empty() && self.span_id > -1 &&
!self.trace_id.is_empty()
}
diff --git a/src/trace/tracer.rs b/src/trace/tracer.rs
index 599ec95..7c490c6 100644
--- a/src/trace/tracer.rs
+++ b/src/trace/tracer.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+//! Tracer items.
+
use crate::{
reporter::{CollectItem, DynReport, Report},
trace::trace_context::TracingContext,
@@ -26,16 +28,16 @@ static GLOBAL_TRACER: OnceCell<Tracer> =
OnceCell::const_new();
/// Set the global tracer.
pub fn set_global_tracer(tracer: Tracer) {
if GLOBAL_TRACER.set(tracer).is_err() {
- panic!("global tracer has setted")
+ panic!("global tracer has set")
}
}
/// Get the global tracer.
pub fn global_tracer() -> &'static Tracer {
- GLOBAL_TRACER.get().expect("global tracer haven't setted")
+ GLOBAL_TRACER.get().expect("global tracer haven't set")
}
-/// Create trace conetxt by global tracer.
+/// Create trace context by global tracer.
pub fn create_trace_context() -> TracingContext {
global_tracer().create_trace_context()
}
@@ -68,15 +70,17 @@ impl Tracer {
}
}
+ /// Get service name.
pub fn service_name(&self) -> &str {
&self.inner.service_name
}
+ /// Get instance name.
pub fn instance_name(&self) -> &str {
&self.inner.instance_name
}
- /// Create trace conetxt.
+ /// Create trace context.
pub fn create_trace_context(&self) -> TracingContext {
TracingContext::new(
&self.inner.service_name,