This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git
The following commit(s) were added to refs/heads/master by this push:
new 7d344de Add management support. (#48)
7d344de is described below
commit 7d344dea0fdc6b29c03ea62915226b35a4161c82
Author: jmjoy <[email protected]>
AuthorDate: Thu Sep 15 12:29:11 2022 +0800
Add management support. (#48)
---
.github/workflows/ci.yaml | 5 +-
Cargo.toml | 19 +++++
README.md | 25 ++++++
build.rs | 4 +-
examples/simple_management_report.rs | 54 +++++++++++++
rust-toolchain.toml | 2 +-
src/lib.rs | 4 +
src/logging/logger.rs | 10 +--
src/logging/record.rs | 22 +++---
src/management/instance.rs | 147 +++++++++++++++++++++++++++++++++++
src/management/manager.rs | 98 +++++++++++++++++++++++
src/{lib.rs => management/mod.rs} | 14 +---
src/metrics/meter.rs | 44 +++++------
src/metrics/metricer.rs | 10 +--
src/reporter/grpc.rs | 43 +++++++++-
src/reporter/mod.rs | 14 +++-
src/reporter/print.rs | 16 ++++
src/skywalking_proto/v3/mod.rs | 14 ++--
src/trace/span.rs | 6 +-
src/trace/trace_context.rs | 8 +-
src/trace/tracer.rs | 10 +--
tests/logging.rs | 2 +-
tests/management.rs | 103 ++++++++++++++++++++++++
tests/metrics.rs | 2 +-
tests/trace_context.rs | 2 +-
25 files changed, 589 insertions(+), 89 deletions(-)
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 27f4355..1d608ce 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -75,6 +75,7 @@ jobs:
matrix:
features:
- ""
+ - "--features management"
- "--features vendored"
- "--all-features"
runs-on: ubuntu-20.04
@@ -103,6 +104,6 @@ jobs:
- name: Install protoc
run: sudo apt-get install -y protobuf-compiler=3.6.1.3-2ubuntu5
- name: Install Rust toolchain
- run: rustup toolchain install stable
+ run: rustup toolchain install nightly-2022-07-30
- name: Run docs
- run: cargo rustdoc --release -- -D warnings
+ run: cargo +nightly-2022-07-30 rustdoc --release --all-features --
--cfg docsrs
diff --git a/Cargo.toml b/Cargo.toml
index 4468738..d1e5db2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,9 +31,12 @@ description = "Apache SkyWalking Rust Agent"
license = "Apache-2.0"
homepage = "https://skywalking.apache.org/"
repository = "https://github.com/apache/skywalking-rust"
+rust-version = "1.59"
[features]
+management = ["hostname", "systemstat"]
vendored = ["protobuf-src"]
+
mock = [] # For internal integration testing only, do not use.
[dependencies]
@@ -43,10 +46,13 @@ bytes = "1.2.1"
cfg-if = "1.0.0"
futures-core = "0.3.21"
futures-util = "0.3.21"
+hostname = { version = "0.3.1", optional = true }
+once_cell = "1.14.0"
portable-atomic = { version = "0.3.13", features = ["float"] }
prost = "0.11.0"
prost-derive = "0.11.0"
serde = { version = "1.0.143", features = ["derive"] }
+systemstat = { version = "0.2.0", optional = true }
thiserror = "1.0.32"
tokio = { version = "1.20.1", features = ["parking_lot"] }
tonic = { version = "0.8.0", features = ["codegen"] }
@@ -73,6 +79,19 @@ required-features = ["mock"]
name = "metrics"
required-features = ["mock"]
+[[test]]
+name = "management"
+required-features = ["management"]
+
[[example]]
name = "simple_trace_report"
path = "examples/simple_trace_report.rs"
+
+[[example]]
+name = "simple_management_report"
+path = "examples/simple_management_report.rs"
+required-features = ["management"]
+
+[package.metadata.docs.rs]
+rustdoc-args = ["--cfg", "docsrs"]
+all-features = true
diff --git a/README.md b/README.md
index 97211d4..af00725 100644
--- a/README.md
+++ b/README.md
@@ -55,6 +55,31 @@ LogRecord is the simple builder for the LogData, which is
the Log format of Skyw
- **Gauge** API represents a single numerical value.
- **Histogram** API represents a summary sample observations with customized
buckets.
+## Management
+
+Reporting the extra information of the instance.
+
+### Report instance properties
+
+The method `insert_os_info` of `skywalking::management::instance::Properties`
will insert the predefined os info.
+In addition, you can use `insert`, `update`, and `remove` to customize your
instance information.
+
+The predefined os info:
+
+| Key | Value |
+| ------------------------ | ------------------------------ |
+| hostname | The hostname of os. |
+| ipv4 (probably multiple) | The ipv4 addresses of network. |
+| language | rust |
+| OS Name | Linux / Windows / Mac OS X |
+| Process No. | The ID of Process. |
+
+### Keep alive
+
+Keep the instance alive in the backend analysis.
+Only recommend to do separate keepAlive report when no trace and metrics needs
to be reported.
+Otherwise, it is duplicated.
+
# Example
```rust, no_run
diff --git a/build.rs b/build.rs
index 7ef9a20..e83a7e5 100644
--- a/build.rs
+++ b/build.rs
@@ -23,11 +23,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
.compile(
&[
-
"./skywalking-data-collect-protocol/language-agent/Tracing.proto",
"./skywalking-data-collect-protocol/language-agent/Meter.proto",
+
"./skywalking-data-collect-protocol/language-agent/Tracing.proto",
"./skywalking-data-collect-protocol/logging/Logging.proto",
+
"./skywalking-data-collect-protocol/management/Management.proto",
],
&["./skywalking-data-collect-protocol"],
)?;
+
Ok(())
}
diff --git a/examples/simple_management_report.rs
b/examples/simple_management_report.rs
new file mode 100644
index 0000000..107aa95
--- /dev/null
+++ b/examples/simple_management_report.rs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+use skywalking::{
+ management::{instance::Properties, manager::Manager},
+ reporter::grpc::GrpcReporter,
+};
+use std::{error::Error, time::Duration};
+use tokio::signal;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ // Connect to skywalking oap server.
+ let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
+
+ // Spawn the reporting in background, with listening the graceful shutdown
+ // signal.
+ let handle = reporter
+ .reporting()
+ .await
+ .with_graceful_shutdown(async move {
+ signal::ctrl_c().await.expect("failed to listen for event");
+ })
+ .spawn();
+
+ let manager = Manager::new("service", "instance", reporter);
+
+ // Report instance properties.
+ let mut props = Properties::default();
+ props.insert_os_info();
+ manager.report_properties(props);
+
+ // Keep alive
+ manager.keep_alive(Duration::from_secs(10));
+
+ handle.await?;
+
+ Ok(())
+}
diff --git a/rust-toolchain.toml b/rust-toolchain.toml
index 09cf6d7..32766df 100644
--- a/rust-toolchain.toml
+++ b/rust-toolchain.toml
@@ -16,5 +16,5 @@
# under the License.
#
[toolchain]
-channel = "1.57.0"
+channel = "1.59.0"
components = ["rustfmt", "clippy"]
diff --git a/src/lib.rs b/src/lib.rs
index 7d4cc0c..65999d9 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -16,10 +16,14 @@
#![warn(rust_2018_idioms)]
#![warn(clippy::dbg_macro, clippy::print_stdout)]
#![doc = include_str!("../README.md")]
+#![cfg_attr(docsrs, feature(doc_cfg))]
pub mod common;
pub(crate) mod error;
pub mod logging;
+#[cfg(feature = "management")]
+#[cfg_attr(docsrs, doc(cfg(feature = "management")))]
+pub mod management;
pub mod metrics;
pub mod reporter;
pub mod skywalking_proto;
diff --git a/src/logging/logger.rs b/src/logging/logger.rs
index f1bb2dc..a831058 100644
--- a/src/logging/logger.rs
+++ b/src/logging/logger.rs
@@ -52,14 +52,14 @@ pub struct Logger {
impl Logger {
/// New with service info and reporter.
pub fn new(
- service_name: impl ToString,
- instance_name: impl ToString,
+ service_name: impl Into<String>,
+ instance_name: impl Into<String>,
reporter: impl Report + Send + Sync + 'static,
) -> Self {
Self {
inner: Arc::new(Inner {
- service_name: service_name.to_string(),
- instance_name: instance_name.to_string(),
+ service_name: service_name.into(),
+ instance_name: instance_name.into(),
reporter: Box::new(reporter),
}),
}
@@ -78,6 +78,6 @@ impl Logger {
self.service_name().to_owned(),
self.instance_name().to_owned(),
);
- self.inner.reporter.report(CollectItem::Log(data));
+ self.inner.reporter.report(CollectItem::Log(Box::new(data)));
}
}
diff --git a/src/logging/record.rs b/src/logging/record.rs
index 02f5a83..02b5e81 100644
--- a/src/logging/record.rs
+++ b/src/logging/record.rs
@@ -68,26 +68,24 @@ impl LogRecord {
}
#[inline]
- pub fn endpoint(mut self, endpoint: impl ToString) -> Self {
- self.endpoint = endpoint.to_string();
+ pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
+ self.endpoint = endpoint.into();
self
}
- pub fn add_tag(mut self, key: impl ToString, value: impl ToString) -> Self
{
- self.tags.push((key.to_string(), value.to_string()));
+ pub fn add_tag(mut self, key: impl Into<String>, value: impl Into<String>)
-> Self {
+ self.tags.push((key.into(), value.into()));
self
}
pub fn add_tags<K, V, I>(mut self, tags: I) -> Self
where
- K: ToString,
- V: ToString,
+ K: Into<String>,
+ V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
- self.tags.extend(
- tags.into_iter()
- .map(|(k, v)| (k.to_string(), v.to_string())),
- );
+ self.tags
+ .extend(tags.into_iter().map(|(k, v)| (k.into(), v.into())));
self
}
@@ -107,8 +105,8 @@ impl LogRecord {
self
}
- pub fn content(mut self, content: impl ToString) -> Self {
- self.content = content.to_string();
+ pub fn content(mut self, content: impl Into<String>) -> Self {
+ self.content = content.into();
self
}
diff --git a/src/management/instance.rs b/src/management/instance.rs
new file mode 100644
index 0000000..6b72f02
--- /dev/null
+++ b/src/management/instance.rs
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use crate::skywalking_proto::v3::{InstanceProperties, KeyStringValuePair};
+use once_cell::sync::Lazy;
+use std::{collections::HashMap, process};
+use systemstat::{IpAddr, Platform, System};
+
+static IPS: Lazy<Vec<String>> = Lazy::new(|| {
+ System::new()
+ .networks()
+ .ok()
+ .map(|networks| {
+ networks
+ .values()
+ .filter(|network| {
+ network.name != "lo"
+ && !network.name.starts_with("docker")
+ && !network.name.starts_with("br-")
+ })
+ .flat_map(|network| {
+ network.addrs.iter().filter_map(|addr| match addr.addr {
+ IpAddr::V4(addr) => Some(addr.to_string()),
+ _ => None,
+ })
+ })
+ .collect()
+ })
+ .unwrap_or_default()
+});
+
+static HOST_NAME: Lazy<Option<String>> = Lazy::new(|| {
+ hostname::get()
+ .ok()
+ .and_then(|hostname| hostname.into_string().ok())
+});
+
+const OS_NAME: Option<&str> = if cfg!(target_os = "linux") {
+ Some("Linux")
+} else if cfg!(target_os = "windows") {
+ Some("Windows")
+} else if cfg!(target_os = "macos") {
+ Some("Mac OS X")
+} else {
+ None
+};
+
+#[derive(Debug, Default)]
+pub struct Properties {
+ inner: HashMap<String, Vec<String>>,
+}
+
+impl Properties {
+ pub const KEY_HOST_NAME: &'static str = "hostname";
+ pub const KEY_IPV4: &'static str = "ipv4";
+ pub const KEY_LANGUAGE: &'static str = "language";
+ pub const KEY_OS_NAME: &'static str = "OS Name";
+ pub const KEY_PROCESS_NO: &'static str = "Process No.";
+}
+
+impl Properties {
+ #[inline]
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ pub fn insert(&mut self, key: impl Into<String>, value: impl Into<String>)
{
+ self.inner.entry(key.into()).or_default().push(value.into());
+ }
+
+ pub fn update(&mut self, key: &str, value: impl Into<String>) {
+ if let Some(values) = self.inner.get_mut(key) {
+ *values = vec![value.into()];
+ }
+ }
+
+ pub fn remove(&mut self, key: &str) {
+ self.inner.remove(key);
+ }
+
+ pub fn insert_os_info(&mut self) {
+ for (key, value) in build_os_info() {
+ self.insert(key, value);
+ }
+ }
+
+ pub(crate) fn convert_to_instance_properties(
+ self,
+ service_name: String,
+ instance_name: String,
+ ) -> InstanceProperties {
+ let mut properties = Vec::new();
+ for (key, values) in self.inner {
+ for value in values {
+ properties.push(KeyStringValuePair {
+ key: key.clone(),
+ value,
+ });
+ }
+ }
+
+ InstanceProperties {
+ service: service_name,
+ service_instance: instance_name,
+ properties,
+ layer: Default::default(),
+ }
+ }
+}
+
+fn build_os_info() -> Vec<(String, String)> {
+ let mut items = Vec::new();
+
+ if let Some(os_name) = OS_NAME.as_ref() {
+ items.push((Properties::KEY_OS_NAME.to_string(), os_name.to_string()));
+ }
+
+ if let Some(host_name) = HOST_NAME.as_ref() {
+ items.push((Properties::KEY_HOST_NAME.to_string(), host_name.clone()));
+ }
+
+ for ip in IPS.iter() {
+ items.push((Properties::KEY_IPV4.to_string(), ip.to_string()));
+ }
+
+ items.push((
+ Properties::KEY_PROCESS_NO.to_string(),
+ process::id().to_string(),
+ ));
+
+ items.push((Properties::KEY_LANGUAGE.to_string(), "rust".to_string()));
+
+ items
+}
diff --git a/src/management/manager.rs b/src/management/manager.rs
new file mode 100644
index 0000000..fe45f79
--- /dev/null
+++ b/src/management/manager.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use super::instance::Properties;
+use crate::reporter::{CollectItem, DynReport, Report};
+use std::{
+ future::Future,
+ pin::Pin,
+ sync::Arc,
+ task::{Context, Poll},
+ time::Duration,
+};
+use tokio::{
+ spawn,
+ task::{JoinError, JoinHandle},
+ time,
+};
+
+pub struct Manager {
+ service_name: String,
+ instance_name: String,
+ reporter: Arc<DynReport>,
+}
+
+impl Manager {
+ /// New with service info and reporter.
+ pub fn new(
+ service_name: impl Into<String>,
+ instance_name: impl Into<String>,
+ reporter: impl Report + Send + Sync + 'static,
+ ) -> Self {
+ Self {
+ service_name: service_name.into(),
+ instance_name: instance_name.into(),
+ reporter: Arc::new(reporter),
+ }
+ }
+
+ pub fn service_name(&self) -> &str {
+ &self.service_name
+ }
+
+ pub fn instance_name(&self) -> &str {
+ &self.instance_name
+ }
+
+ pub fn report_properties(&self, properties: Properties) {
+ let props = properties
+ .convert_to_instance_properties(self.service_name.clone(),
self.instance_name.clone());
+ self.reporter.report(CollectItem::Instance(Box::new(props)));
+ }
+
+ pub fn keep_alive(&self, interval: Duration) -> KeepAlive {
+ let service_name = self.service_name.clone();
+ let instance_name = self.instance_name.clone();
+ let reporter = self.reporter.clone();
+ let handle = spawn(async move {
+ let mut ticker = time::interval(interval);
+ loop {
+ ticker.tick().await;
+
+ reporter.report(CollectItem::Ping(Box::new(
+ crate::skywalking_proto::v3::InstancePingPkg {
+ service: service_name.clone(),
+ service_instance: instance_name.clone(),
+ layer: Default::default(),
+ },
+ )));
+ }
+ });
+ KeepAlive { handle }
+ }
+}
+
+pub struct KeepAlive {
+ handle: JoinHandle<()>,
+}
+
+impl Future for KeepAlive {
+ type Output = Result<(), JoinError>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Self::Output> {
+ Pin::new(&mut self.handle).poll(cx)
+ }
+}
diff --git a/src/lib.rs b/src/management/mod.rs
similarity index 74%
copy from src/lib.rs
copy to src/management/mod.rs
index 7d4cc0c..831bad1 100644
--- a/src/lib.rs
+++ b/src/management/mod.rs
@@ -13,16 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
-#![warn(rust_2018_idioms)]
-#![warn(clippy::dbg_macro, clippy::print_stdout)]
-#![doc = include_str!("../README.md")]
-pub mod common;
-pub(crate) mod error;
-pub mod logging;
-pub mod metrics;
-pub mod reporter;
-pub mod skywalking_proto;
-pub mod trace;
-
-pub use error::{Error, Result};
+pub mod instance;
+pub mod manager;
diff --git a/src/metrics/meter.rs b/src/metrics/meter.rs
index ea97852..2ccf401 100644
--- a/src/metrics/meter.rs
+++ b/src/metrics/meter.rs
@@ -48,21 +48,19 @@ pub struct MeterId {
}
impl MeterId {
- fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self {
- self.labels.push((key.to_string(), value.to_string()));
+ fn add_label(mut self, key: impl Into<String>, value: impl Into<String>)
-> Self {
+ self.labels.push((key.into(), value.into()));
self
}
fn add_labels<K, V, I>(mut self, tags: I) -> Self
where
- K: ToString,
- V: ToString,
+ K: Into<String>,
+ V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
- self.labels.extend(
- tags.into_iter()
- .map(|(k, v)| (k.to_string(), v.to_string())),
- );
+ self.labels
+ .extend(tags.into_iter().map(|(k, v)| (k.into(), v.into())));
self
}
}
@@ -86,10 +84,10 @@ pub struct Counter {
impl Counter {
#[inline]
- pub fn new(name: impl ToString) -> Self {
+ pub fn new(name: impl Into<String>) -> Self {
Self {
id: MeterId {
- name: name.to_string(),
+ name: name.into(),
typ: MeterType::Counter,
labels: vec![],
},
@@ -100,7 +98,7 @@ impl Counter {
}
#[inline]
- pub fn add_label(mut self, key: impl ToString, value: impl ToString) ->
Self {
+ pub fn add_label(mut self, key: impl Into<String>, value: impl
Into<String>) -> Self {
self.id = self.id.add_label(key, value);
self
}
@@ -108,8 +106,8 @@ impl Counter {
#[inline]
pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
where
- K: ToString,
- V: ToString,
+ K: Into<String>,
+ V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
self.id = self.id.add_labels(tags);
@@ -173,10 +171,10 @@ pub struct Gauge<G> {
impl<G: Fn() -> f64> Gauge<G> {
#[inline]
- pub fn new(name: impl ToString, getter: G) -> Self {
+ pub fn new(name: impl Into<String>, getter: G) -> Self {
Self {
id: MeterId {
- name: name.to_string(),
+ name: name.into(),
typ: MeterType::Gauge,
labels: vec![],
},
@@ -185,7 +183,7 @@ impl<G: Fn() -> f64> Gauge<G> {
}
#[inline]
- pub fn add_label(mut self, key: impl ToString, value: impl ToString) ->
Self {
+ pub fn add_label(mut self, key: impl Into<String>, value: impl
Into<String>) -> Self {
self.id = self.id.add_label(key, value);
self
}
@@ -193,8 +191,8 @@ impl<G: Fn() -> f64> Gauge<G> {
#[inline]
pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
where
- K: ToString,
- V: ToString,
+ K: Into<String>,
+ V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
self.id = self.id.add_labels(tags);
@@ -253,10 +251,10 @@ pub struct Histogram {
}
impl Histogram {
- pub fn new(name: impl ToString, mut steps: Vec<f64>) -> Self {
+ pub fn new(name: impl Into<String>, mut steps: Vec<f64>) -> Self {
Self {
id: MeterId {
- name: name.to_string(),
+ name: name.into(),
typ: MeterType::Histogram,
labels: vec![],
},
@@ -269,7 +267,7 @@ impl Histogram {
}
#[inline]
- pub fn add_label(mut self, key: impl ToString, value: impl ToString) ->
Self {
+ pub fn add_label(mut self, key: impl Into<String>, value: impl
Into<String>) -> Self {
self.id = self.id.add_label(key, value);
self
}
@@ -277,8 +275,8 @@ impl Histogram {
#[inline]
pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
where
- K: ToString,
- V: ToString,
+ K: Into<String>,
+ V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
self.id = self.id.add_labels(tags);
diff --git a/src/metrics/metricer.rs b/src/metrics/metricer.rs
index 81b8351..ad8f3de 100644
--- a/src/metrics/metricer.rs
+++ b/src/metrics/metricer.rs
@@ -42,13 +42,13 @@ pub struct Metricer {
impl Metricer {
/// New with service info and reporter.
pub fn new(
- service_name: impl ToString,
- instance_name: impl ToString,
+ service_name: impl Into<String>,
+ instance_name: impl Into<String>,
reporter: impl Report + Send + Sync + 'static,
) -> Self {
Self {
- service_name: service_name.to_string(),
- instance_name: instance_name.to_string(),
+ service_name: service_name.into(),
+ instance_name: instance_name.into(),
reporter: Box::new(reporter),
meter_map: Default::default(),
report_interval: Duration::from_secs(20),
@@ -86,7 +86,7 @@ impl Metricer {
for trans in metricer_.meter_map.values() {
metricer_
.reporter
-
.report(CollectItem::Meter(trans.transform(&metricer_)));
+
.report(CollectItem::Meter(Box::new(trans.transform(&metricer_))));
}
})
.await;
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index fcea63d..5b0843a 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+#[cfg(feature = "management")]
+use
crate::skywalking_proto::v3::management_service_client::ManagementServiceClient;
use crate::{
reporter::{CollectItem, Report},
skywalking_proto::v3::{
@@ -104,6 +106,9 @@ struct Inner<P, C> {
trace_client: Mutex<TraceSegmentReportServiceClient<Channel>>,
log_client: Mutex<LogReportServiceClient<Channel>>,
meter_client: Mutex<MeterReportServiceClient<Channel>>,
+ #[cfg(feature = "management")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
+ management_client: Mutex<ManagementServiceClient<Channel>>,
producer: P,
consumer: Mutex<Option<C>>,
is_reporting: AtomicBool,
@@ -138,6 +143,8 @@ impl<P: CollectItemProduce, C: ColletcItemConsume>
GrpcReporter<P, C> {
inner: Arc::new(Inner {
trace_client:
Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())),
log_client:
Mutex::new(LogReportServiceClient::new(channel.clone())),
+ #[cfg(feature = "management")]
+ management_client:
Mutex::new(ManagementServiceClient::new(channel.clone())),
meter_client:
Mutex::new(MeterReportServiceClient::new(channel)),
producer,
consumer: Mutex::new(Some(consumer)),
@@ -216,13 +223,43 @@ impl<P: CollectItemProduce, C: ColletcItemConsume>
ReporterAndBuffer<P, C> {
// TODO Implement batch collect in future.
match item {
CollectItem::Trace(item) => {
- self.trace_buffer.push_back(item);
+ self.trace_buffer.push_back(*item);
}
CollectItem::Log(item) => {
- self.log_buffer.push_back(item);
+ self.log_buffer.push_back(*item);
}
CollectItem::Meter(item) => {
- self.meter_buffer.push_back(item);
+ self.meter_buffer.push_back(*item);
+ }
+ #[cfg(feature = "management")]
+ CollectItem::Instance(item) => {
+ if let Err(e) = self
+ .inner
+ .management_client
+ .lock()
+ .await
+ .report_instance_properties(*item)
+ .await
+ {
+ if let Some(status_handle) = &self.status_handle {
+ status_handle(e);
+ }
+ }
+ }
+ #[cfg(feature = "management")]
+ CollectItem::Ping(item) => {
+ if let Err(e) = self
+ .inner
+ .management_client
+ .lock()
+ .await
+ .keep_alive(*item)
+ .await
+ {
+ if let Some(status_handle) = &self.status_handle {
+ status_handle(e);
+ }
+ }
}
}
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
index 79735bf..921cb02 100644
--- a/src/reporter/mod.rs
+++ b/src/reporter/mod.rs
@@ -17,6 +17,8 @@
pub mod grpc;
pub mod print;
+#[cfg(feature = "management")]
+use crate::skywalking_proto::v3::{InstancePingPkg, InstanceProperties};
use crate::skywalking_proto::v3::{LogData, MeterData, SegmentObject};
use serde::{Deserialize, Serialize};
use std::{ops::Deref, sync::Arc};
@@ -25,9 +27,15 @@ use tokio::sync::OnceCell;
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub enum CollectItem {
- Trace(SegmentObject),
- Log(LogData),
- Meter(MeterData),
+ Trace(Box<SegmentObject>),
+ Log(Box<LogData>),
+ Meter(Box<MeterData>),
+ #[cfg(feature = "management")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
+ Instance(Box<InstanceProperties>),
+ #[cfg(feature = "management")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
+ Ping(Box<InstancePingPkg>),
}
pub(crate) type DynReport = dyn Report + Send + Sync + 'static;
diff --git a/src/reporter/print.rs b/src/reporter/print.rs
index e5b640f..6124c46 100644
--- a/src/reporter/print.rs
+++ b/src/reporter/print.rs
@@ -57,6 +57,22 @@ impl Report for PrintReporter {
println!("meter data={:?}", data);
}
}
+ #[cfg(feature = "management")]
+ CollectItem::Instance(data) => {
+ if self.use_stderr {
+ eprintln!("instance properties data={:?}", data);
+ } else {
+ println!("instance properties data={:?}", data);
+ }
+ }
+ #[cfg(feature = "management")]
+ CollectItem::Ping(data) => {
+ if self.use_stderr {
+ eprintln!("ping data={:?}", data);
+ } else {
+ println!("ping data={:?}", data);
+ }
+ }
}
}
}
diff --git a/src/skywalking_proto/v3/mod.rs b/src/skywalking_proto/v3/mod.rs
index 8623eaa..dc74965 100644
--- a/src/skywalking_proto/v3/mod.rs
+++ b/src/skywalking_proto/v3/mod.rs
@@ -21,8 +21,8 @@ impl SpanObject {
/// Add logs to the span.
pub fn add_log<K, V, I>(&mut self, message: I)
where
- K: ToString,
- V: ToString,
+ K: Into<String>,
+ V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
let log = Log {
@@ -32,8 +32,8 @@ impl SpanObject {
.map(|v| {
let (key, value) = v;
KeyStringValuePair {
- key: key.to_string(),
- value: value.to_string(),
+ key: key.into(),
+ value: value.into(),
}
})
.collect(),
@@ -42,10 +42,10 @@ impl SpanObject {
}
/// Add tag to the span.
- pub fn add_tag(&mut self, key: impl ToString, value: impl ToString) {
+ pub fn add_tag(&mut self, key: impl Into<String>, value: impl
Into<String>) {
self.tags.push(KeyStringValuePair {
- key: key.to_string(),
- value: value.to_string(),
+ key: key.into(),
+ value: value.into(),
});
}
}
diff --git a/src/trace/span.rs b/src/trace/span.rs
index ce49c6b..84d97fa 100644
--- a/src/trace/span.rs
+++ b/src/trace/span.rs
@@ -141,15 +141,15 @@ impl Span {
/// Add logs to the span.
pub fn add_log<K, V, I>(&mut self, message: I)
where
- K: ToString,
- V: ToString,
+ K: Into<String>,
+ V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
self.with_span_object_mut(|span| span.add_log(message))
}
/// Add tag to the span.
- pub fn add_tag(&mut self, key: impl ToString, value: impl ToString) {
+ pub fn add_tag(&mut self, key: impl Into<String>, value: impl
Into<String>) {
self.with_span_object_mut(|span| span.add_tag(key, value))
}
}
diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs
index 659b3de..d0daa94 100644
--- a/src/trace/trace_context.rs
+++ b/src/trace/trace_context.rs
@@ -119,15 +119,15 @@ impl std::fmt::Debug for TracingContext {
impl TracingContext {
/// Generate a new trace context.
pub(crate) fn new(
- service_name: impl ToString,
- instance_name: impl ToString,
+ service_name: impl Into<String>,
+ instance_name: impl Into<String>,
tracer: WeakTracer,
) -> Self {
TracingContext {
trace_id: RandomGenerator::generate(),
trace_segment_id: RandomGenerator::generate(),
- service: service_name.to_string(),
- service_instance: instance_name.to_string(),
+ service: service_name.into(),
+ service_instance: instance_name.into(),
next_span_id: Default::default(),
span_stack: Default::default(),
primary_endpoint_name: Default::default(),
diff --git a/src/trace/tracer.rs b/src/trace/tracer.rs
index 963d270..599ec95 100644
--- a/src/trace/tracer.rs
+++ b/src/trace/tracer.rs
@@ -55,14 +55,14 @@ pub struct Tracer {
impl Tracer {
/// New with service info and reporter.
pub fn new(
- service_name: impl ToString,
- instance_name: impl ToString,
+ service_name: impl Into<String>,
+ instance_name: impl Into<String>,
reporter: impl Report + Send + Sync + 'static,
) -> Self {
Self {
inner: Arc::new(Inner {
- service_name: service_name.to_string(),
- instance_name: instance_name.to_string(),
+ service_name: service_name.into(),
+ instance_name: instance_name.into(),
reporter: Box::new(reporter),
}),
}
@@ -90,7 +90,7 @@ impl Tracer {
let segment_object = context.convert_to_segment_object();
self.inner
.reporter
- .report(CollectItem::Trace(segment_object));
+ .report(CollectItem::Trace(Box::new(segment_object)));
}
fn downgrade(&self) -> WeakTracer {
diff --git a/tests/logging.rs b/tests/logging.rs
index e909bf6..53483d2 100644
--- a/tests/logging.rs
+++ b/tests/logging.rs
@@ -161,7 +161,7 @@ impl Report for MockReporter {
fn report(&self, item: CollectItem) {
match item {
CollectItem::Log(data) => {
- self.items.try_lock().unwrap().push_back(data);
+ self.items.try_lock().unwrap().push_back(*data);
}
_ => {}
}
diff --git a/tests/management.rs b/tests/management.rs
new file mode 100644
index 0000000..5b10419
--- /dev/null
+++ b/tests/management.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use skywalking::{
+ management::{instance::Properties, manager::Manager},
+ reporter::{CollectItem, Report},
+ skywalking_proto::v3::{InstancePingPkg, InstanceProperties,
KeyStringValuePair},
+};
+use std::{
+ collections::LinkedList,
+ process,
+ sync::{Arc, Mutex},
+ time::Duration,
+};
+use tokio::time::sleep;
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+async fn management() {
+ let reporter = Arc::new(MockReporter::default());
+ let manager = Manager::new("service_name", "instance_name",
reporter.clone());
+ manager.keep_alive(Duration::from_secs(60));
+
+ {
+ let mut props = Properties::new();
+ props.insert_os_info();
+ manager.report_properties(props);
+
+ let actual_props = reporter.pop_ins_props();
+ assert_eq!(actual_props.service, "service_name".to_owned());
+ assert_eq!(actual_props.service_instance, "instance_name".to_owned());
+ assert_eq!(
+ kvs_get_value(&actual_props.properties, Properties::KEY_LANGUAGE),
+ "rust"
+ );
+ assert_eq!(
+ kvs_get_value(&actual_props.properties, Properties::KEY_HOST_NAME),
+ hostname::get().unwrap()
+ );
+ assert_eq!(
+ kvs_get_value(&actual_props.properties,
Properties::KEY_PROCESS_NO),
+ process::id().to_string()
+ );
+ }
+
+ {
+ sleep(Duration::from_secs(1)).await;
+ assert_eq!(
+ reporter.pop_ping(),
+ InstancePingPkg {
+ service: "service_name".to_owned(),
+ service_instance: "instance_name".to_owned(),
+ ..Default::default()
+ }
+ );
+ }
+}
+
+fn kvs_get_value<'a>(kvs: &'a [KeyStringValuePair], key: &str) -> &'a str {
+ &kvs.iter().find(|kv| kv.key == key).unwrap().value
+}
+
+#[derive(Default, Clone)]
+struct MockReporter {
+ props_items: Arc<Mutex<LinkedList<InstanceProperties>>>,
+ ping_items: Arc<Mutex<LinkedList<InstancePingPkg>>>,
+}
+
+impl MockReporter {
+ fn pop_ins_props(&self) -> InstanceProperties {
+ self.props_items.try_lock().unwrap().pop_back().unwrap()
+ }
+
+ fn pop_ping(&self) -> InstancePingPkg {
+ self.ping_items.try_lock().unwrap().pop_back().unwrap()
+ }
+}
+
+impl Report for MockReporter {
+ fn report(&self, item: CollectItem) {
+ match item {
+ CollectItem::Instance(data) => {
+ self.props_items.try_lock().unwrap().push_back(*data);
+ }
+ CollectItem::Ping(data) => {
+ self.ping_items.try_lock().unwrap().push_back(*data);
+ }
+ _ => {}
+ }
+ }
+}
diff --git a/tests/metrics.rs b/tests/metrics.rs
index f1d95d0..43246ec 100644
--- a/tests/metrics.rs
+++ b/tests/metrics.rs
@@ -166,7 +166,7 @@ impl Report for MockReporter {
fn report(&self, item: CollectItem) {
match item {
CollectItem::Meter(data) => {
- self.items.try_lock().unwrap().push_back(data);
+ self.items.try_lock().unwrap().push_back(*data);
}
_ => {}
}
diff --git a/tests/trace_context.rs b/tests/trace_context.rs
index b967e53..4603ad9 100644
--- a/tests/trace_context.rs
+++ b/tests/trace_context.rs
@@ -387,6 +387,6 @@ impl Report for MockReporter {
CollectItem::Trace(segment) => segment,
_ => unreachable!(),
};
- self.segments.try_lock().unwrap().push_back(segment);
+ self.segments.try_lock().unwrap().push_back(*segment);
}
}