This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d344de  Add management support. (#48)
7d344de is described below

commit 7d344dea0fdc6b29c03ea62915226b35a4161c82
Author: jmjoy <[email protected]>
AuthorDate: Thu Sep 15 12:29:11 2022 +0800

    Add management support. (#48)
---
 .github/workflows/ci.yaml            |   5 +-
 Cargo.toml                           |  19 +++++
 README.md                            |  25 ++++++
 build.rs                             |   4 +-
 examples/simple_management_report.rs |  54 +++++++++++++
 rust-toolchain.toml                  |   2 +-
 src/lib.rs                           |   4 +
 src/logging/logger.rs                |  10 +--
 src/logging/record.rs                |  22 +++---
 src/management/instance.rs           | 147 +++++++++++++++++++++++++++++++++++
 src/management/manager.rs            |  98 +++++++++++++++++++++++
 src/{lib.rs => management/mod.rs}    |  14 +---
 src/metrics/meter.rs                 |  44 +++++------
 src/metrics/metricer.rs              |  10 +--
 src/reporter/grpc.rs                 |  43 +++++++++-
 src/reporter/mod.rs                  |  14 +++-
 src/reporter/print.rs                |  16 ++++
 src/skywalking_proto/v3/mod.rs       |  14 ++--
 src/trace/span.rs                    |   6 +-
 src/trace/trace_context.rs           |   8 +-
 src/trace/tracer.rs                  |  10 +--
 tests/logging.rs                     |   2 +-
 tests/management.rs                  | 103 ++++++++++++++++++++++++
 tests/metrics.rs                     |   2 +-
 tests/trace_context.rs               |   2 +-
 25 files changed, 589 insertions(+), 89 deletions(-)

diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 27f4355..1d608ce 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -75,6 +75,7 @@ jobs:
       matrix:
         features:
           - ""
+          - "--features management"
           - "--features vendored"
           - "--all-features"
     runs-on: ubuntu-20.04
@@ -103,6 +104,6 @@ jobs:
       - name: Install protoc
         run: sudo apt-get install -y protobuf-compiler=3.6.1.3-2ubuntu5
       - name: Install Rust toolchain
-        run: rustup toolchain install stable
+        run: rustup toolchain install nightly-2022-07-30
       - name: Run docs
-        run: cargo rustdoc --release -- -D warnings
+        run: cargo +nightly-2022-07-30 rustdoc --release --all-features -- 
--cfg docsrs
diff --git a/Cargo.toml b/Cargo.toml
index 4468738..d1e5db2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,9 +31,12 @@ description = "Apache SkyWalking Rust Agent"
 license = "Apache-2.0"
 homepage = "https://skywalking.apache.org/";
 repository = "https://github.com/apache/skywalking-rust";
+rust-version = "1.59"
 
 [features]
+management = ["hostname", "systemstat"]
 vendored = ["protobuf-src"]
+
 mock = []  # For internal integration testing only, do not use.
 
 [dependencies]
@@ -43,10 +46,13 @@ bytes = "1.2.1"
 cfg-if = "1.0.0"
 futures-core = "0.3.21"
 futures-util = "0.3.21"
+hostname = { version = "0.3.1", optional = true }
+once_cell = "1.14.0"
 portable-atomic = { version = "0.3.13", features = ["float"] }
 prost = "0.11.0"
 prost-derive = "0.11.0"
 serde = { version = "1.0.143", features = ["derive"] }
+systemstat = { version = "0.2.0", optional = true }
 thiserror = "1.0.32"
 tokio = { version = "1.20.1", features = ["parking_lot"] }
 tonic = { version = "0.8.0", features = ["codegen"] }
@@ -73,6 +79,19 @@ required-features = ["mock"]
 name = "metrics"
 required-features = ["mock"]
 
+[[test]]
+name = "management"
+required-features = ["management"]
+
 [[example]]
 name = "simple_trace_report"
 path = "examples/simple_trace_report.rs"
+
+[[example]]
+name = "simple_management_report"
+path = "examples/simple_management_report.rs"
+required-features = ["management"]
+
+[package.metadata.docs.rs]
+rustdoc-args = ["--cfg", "docsrs"]
+all-features = true
diff --git a/README.md b/README.md
index 97211d4..af00725 100644
--- a/README.md
+++ b/README.md
@@ -55,6 +55,31 @@ LogRecord is the simple builder for the LogData, which is 
the Log format of Skyw
 - **Gauge** API represents a single numerical value.
 - **Histogram** API represents a summary sample observations with customized 
buckets.
 
+## Management
+
+Reporting the extra information of the instance.
+
+### Report instance properties
+
+The method `insert_os_info` of `skywalking::management::instance::Properties` 
will insert the predefined os info.
+In addition, you can use `insert`, `update`, and `remove` to customize your 
instance information.
+
+The predefined os info:
+
+| Key                      | Value                          |
+| ------------------------ | ------------------------------ |
+| hostname                 | The hostname of os.            |
+| ipv4 (probably multiple) | The ipv4 addresses of network. |
+| language                 | rust                           |
+| OS Name                  | Linux / Windows / Mac OS X     |
+| Process No.              | The ID of Process.             |
+
+### Keep alive
+
+Keep the instance alive in the backend analysis.
+Only recommend to do separate keepAlive report when no trace and metrics needs 
to be reported.
+Otherwise, it is duplicated.
+
 # Example
 
 ```rust, no_run
diff --git a/build.rs b/build.rs
index 7ef9a20..e83a7e5 100644
--- a/build.rs
+++ b/build.rs
@@ -23,11 +23,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
         .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
         .compile(
             &[
-                
"./skywalking-data-collect-protocol/language-agent/Tracing.proto",
                 
"./skywalking-data-collect-protocol/language-agent/Meter.proto",
+                
"./skywalking-data-collect-protocol/language-agent/Tracing.proto",
                 "./skywalking-data-collect-protocol/logging/Logging.proto",
+                
"./skywalking-data-collect-protocol/management/Management.proto",
             ],
             &["./skywalking-data-collect-protocol"],
         )?;
+
     Ok(())
 }
diff --git a/examples/simple_management_report.rs 
b/examples/simple_management_report.rs
new file mode 100644
index 0000000..107aa95
--- /dev/null
+++ b/examples/simple_management_report.rs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+use skywalking::{
+    management::{instance::Properties, manager::Manager},
+    reporter::grpc::GrpcReporter,
+};
+use std::{error::Error, time::Duration};
+use tokio::signal;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+    // Connect to skywalking oap server.
+    let reporter = GrpcReporter::connect("http://0.0.0.0:11800";).await?;
+
+    // Spawn the reporting in background, with listening the graceful shutdown
+    // signal.
+    let handle = reporter
+        .reporting()
+        .await
+        .with_graceful_shutdown(async move {
+            signal::ctrl_c().await.expect("failed to listen for event");
+        })
+        .spawn();
+
+    let manager = Manager::new("service", "instance", reporter);
+
+    // Report instance properties.
+    let mut props = Properties::default();
+    props.insert_os_info();
+    manager.report_properties(props);
+
+    // Keep alive
+    manager.keep_alive(Duration::from_secs(10));
+
+    handle.await?;
+
+    Ok(())
+}
diff --git a/rust-toolchain.toml b/rust-toolchain.toml
index 09cf6d7..32766df 100644
--- a/rust-toolchain.toml
+++ b/rust-toolchain.toml
@@ -16,5 +16,5 @@
 # under the License.
 #
 [toolchain]
-channel = "1.57.0"
+channel = "1.59.0"
 components = ["rustfmt", "clippy"]
diff --git a/src/lib.rs b/src/lib.rs
index 7d4cc0c..65999d9 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -16,10 +16,14 @@
 #![warn(rust_2018_idioms)]
 #![warn(clippy::dbg_macro, clippy::print_stdout)]
 #![doc = include_str!("../README.md")]
+#![cfg_attr(docsrs, feature(doc_cfg))]
 
 pub mod common;
 pub(crate) mod error;
 pub mod logging;
+#[cfg(feature = "management")]
+#[cfg_attr(docsrs, doc(cfg(feature = "management")))]
+pub mod management;
 pub mod metrics;
 pub mod reporter;
 pub mod skywalking_proto;
diff --git a/src/logging/logger.rs b/src/logging/logger.rs
index f1bb2dc..a831058 100644
--- a/src/logging/logger.rs
+++ b/src/logging/logger.rs
@@ -52,14 +52,14 @@ pub struct Logger {
 impl Logger {
     /// New with service info and reporter.
     pub fn new(
-        service_name: impl ToString,
-        instance_name: impl ToString,
+        service_name: impl Into<String>,
+        instance_name: impl Into<String>,
         reporter: impl Report + Send + Sync + 'static,
     ) -> Self {
         Self {
             inner: Arc::new(Inner {
-                service_name: service_name.to_string(),
-                instance_name: instance_name.to_string(),
+                service_name: service_name.into(),
+                instance_name: instance_name.into(),
                 reporter: Box::new(reporter),
             }),
         }
@@ -78,6 +78,6 @@ impl Logger {
             self.service_name().to_owned(),
             self.instance_name().to_owned(),
         );
-        self.inner.reporter.report(CollectItem::Log(data));
+        self.inner.reporter.report(CollectItem::Log(Box::new(data)));
     }
 }
diff --git a/src/logging/record.rs b/src/logging/record.rs
index 02f5a83..02b5e81 100644
--- a/src/logging/record.rs
+++ b/src/logging/record.rs
@@ -68,26 +68,24 @@ impl LogRecord {
     }
 
     #[inline]
-    pub fn endpoint(mut self, endpoint: impl ToString) -> Self {
-        self.endpoint = endpoint.to_string();
+    pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
+        self.endpoint = endpoint.into();
         self
     }
 
-    pub fn add_tag(mut self, key: impl ToString, value: impl ToString) -> Self 
{
-        self.tags.push((key.to_string(), value.to_string()));
+    pub fn add_tag(mut self, key: impl Into<String>, value: impl Into<String>) 
-> Self {
+        self.tags.push((key.into(), value.into()));
         self
     }
 
     pub fn add_tags<K, V, I>(mut self, tags: I) -> Self
     where
-        K: ToString,
-        V: ToString,
+        K: Into<String>,
+        V: Into<String>,
         I: IntoIterator<Item = (K, V)>,
     {
-        self.tags.extend(
-            tags.into_iter()
-                .map(|(k, v)| (k.to_string(), v.to_string())),
-        );
+        self.tags
+            .extend(tags.into_iter().map(|(k, v)| (k.into(), v.into())));
         self
     }
 
@@ -107,8 +105,8 @@ impl LogRecord {
         self
     }
 
-    pub fn content(mut self, content: impl ToString) -> Self {
-        self.content = content.to_string();
+    pub fn content(mut self, content: impl Into<String>) -> Self {
+        self.content = content.into();
         self
     }
 
diff --git a/src/management/instance.rs b/src/management/instance.rs
new file mode 100644
index 0000000..6b72f02
--- /dev/null
+++ b/src/management/instance.rs
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use crate::skywalking_proto::v3::{InstanceProperties, KeyStringValuePair};
+use once_cell::sync::Lazy;
+use std::{collections::HashMap, process};
+use systemstat::{IpAddr, Platform, System};
+
+static IPS: Lazy<Vec<String>> = Lazy::new(|| {
+    System::new()
+        .networks()
+        .ok()
+        .map(|networks| {
+            networks
+                .values()
+                .filter(|network| {
+                    network.name != "lo"
+                        && !network.name.starts_with("docker")
+                        && !network.name.starts_with("br-")
+                })
+                .flat_map(|network| {
+                    network.addrs.iter().filter_map(|addr| match addr.addr {
+                        IpAddr::V4(addr) => Some(addr.to_string()),
+                        _ => None,
+                    })
+                })
+                .collect()
+        })
+        .unwrap_or_default()
+});
+
+static HOST_NAME: Lazy<Option<String>> = Lazy::new(|| {
+    hostname::get()
+        .ok()
+        .and_then(|hostname| hostname.into_string().ok())
+});
+
+const OS_NAME: Option<&str> = if cfg!(target_os = "linux") {
+    Some("Linux")
+} else if cfg!(target_os = "windows") {
+    Some("Windows")
+} else if cfg!(target_os = "macos") {
+    Some("Mac OS X")
+} else {
+    None
+};
+
+#[derive(Debug, Default)]
+pub struct Properties {
+    inner: HashMap<String, Vec<String>>,
+}
+
+impl Properties {
+    pub const KEY_HOST_NAME: &'static str = "hostname";
+    pub const KEY_IPV4: &'static str = "ipv4";
+    pub const KEY_LANGUAGE: &'static str = "language";
+    pub const KEY_OS_NAME: &'static str = "OS Name";
+    pub const KEY_PROCESS_NO: &'static str = "Process No.";
+}
+
+impl Properties {
+    #[inline]
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    pub fn insert(&mut self, key: impl Into<String>, value: impl Into<String>) 
{
+        self.inner.entry(key.into()).or_default().push(value.into());
+    }
+
+    pub fn update(&mut self, key: &str, value: impl Into<String>) {
+        if let Some(values) = self.inner.get_mut(key) {
+            *values = vec![value.into()];
+        }
+    }
+
+    pub fn remove(&mut self, key: &str) {
+        self.inner.remove(key);
+    }
+
+    pub fn insert_os_info(&mut self) {
+        for (key, value) in build_os_info() {
+            self.insert(key, value);
+        }
+    }
+
+    pub(crate) fn convert_to_instance_properties(
+        self,
+        service_name: String,
+        instance_name: String,
+    ) -> InstanceProperties {
+        let mut properties = Vec::new();
+        for (key, values) in self.inner {
+            for value in values {
+                properties.push(KeyStringValuePair {
+                    key: key.clone(),
+                    value,
+                });
+            }
+        }
+
+        InstanceProperties {
+            service: service_name,
+            service_instance: instance_name,
+            properties,
+            layer: Default::default(),
+        }
+    }
+}
+
+fn build_os_info() -> Vec<(String, String)> {
+    let mut items = Vec::new();
+
+    if let Some(os_name) = OS_NAME.as_ref() {
+        items.push((Properties::KEY_OS_NAME.to_string(), os_name.to_string()));
+    }
+
+    if let Some(host_name) = HOST_NAME.as_ref() {
+        items.push((Properties::KEY_HOST_NAME.to_string(), host_name.clone()));
+    }
+
+    for ip in IPS.iter() {
+        items.push((Properties::KEY_IPV4.to_string(), ip.to_string()));
+    }
+
+    items.push((
+        Properties::KEY_PROCESS_NO.to_string(),
+        process::id().to_string(),
+    ));
+
+    items.push((Properties::KEY_LANGUAGE.to_string(), "rust".to_string()));
+
+    items
+}
diff --git a/src/management/manager.rs b/src/management/manager.rs
new file mode 100644
index 0000000..fe45f79
--- /dev/null
+++ b/src/management/manager.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use super::instance::Properties;
+use crate::reporter::{CollectItem, DynReport, Report};
+use std::{
+    future::Future,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+    time::Duration,
+};
+use tokio::{
+    spawn,
+    task::{JoinError, JoinHandle},
+    time,
+};
+
+pub struct Manager {
+    service_name: String,
+    instance_name: String,
+    reporter: Arc<DynReport>,
+}
+
+impl Manager {
+    /// New with service info and reporter.
+    pub fn new(
+        service_name: impl Into<String>,
+        instance_name: impl Into<String>,
+        reporter: impl Report + Send + Sync + 'static,
+    ) -> Self {
+        Self {
+            service_name: service_name.into(),
+            instance_name: instance_name.into(),
+            reporter: Arc::new(reporter),
+        }
+    }
+
+    pub fn service_name(&self) -> &str {
+        &self.service_name
+    }
+
+    pub fn instance_name(&self) -> &str {
+        &self.instance_name
+    }
+
+    pub fn report_properties(&self, properties: Properties) {
+        let props = properties
+            .convert_to_instance_properties(self.service_name.clone(), 
self.instance_name.clone());
+        self.reporter.report(CollectItem::Instance(Box::new(props)));
+    }
+
+    pub fn keep_alive(&self, interval: Duration) -> KeepAlive {
+        let service_name = self.service_name.clone();
+        let instance_name = self.instance_name.clone();
+        let reporter = self.reporter.clone();
+        let handle = spawn(async move {
+            let mut ticker = time::interval(interval);
+            loop {
+                ticker.tick().await;
+
+                reporter.report(CollectItem::Ping(Box::new(
+                    crate::skywalking_proto::v3::InstancePingPkg {
+                        service: service_name.clone(),
+                        service_instance: instance_name.clone(),
+                        layer: Default::default(),
+                    },
+                )));
+            }
+        });
+        KeepAlive { handle }
+    }
+}
+
+pub struct KeepAlive {
+    handle: JoinHandle<()>,
+}
+
+impl Future for KeepAlive {
+    type Output = Result<(), JoinError>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
+        Pin::new(&mut self.handle).poll(cx)
+    }
+}
diff --git a/src/lib.rs b/src/management/mod.rs
similarity index 74%
copy from src/lib.rs
copy to src/management/mod.rs
index 7d4cc0c..831bad1 100644
--- a/src/lib.rs
+++ b/src/management/mod.rs
@@ -13,16 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 //
-#![warn(rust_2018_idioms)]
-#![warn(clippy::dbg_macro, clippy::print_stdout)]
-#![doc = include_str!("../README.md")]
 
-pub mod common;
-pub(crate) mod error;
-pub mod logging;
-pub mod metrics;
-pub mod reporter;
-pub mod skywalking_proto;
-pub mod trace;
-
-pub use error::{Error, Result};
+pub mod instance;
+pub mod manager;
diff --git a/src/metrics/meter.rs b/src/metrics/meter.rs
index ea97852..2ccf401 100644
--- a/src/metrics/meter.rs
+++ b/src/metrics/meter.rs
@@ -48,21 +48,19 @@ pub struct MeterId {
 }
 
 impl MeterId {
-    fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self {
-        self.labels.push((key.to_string(), value.to_string()));
+    fn add_label(mut self, key: impl Into<String>, value: impl Into<String>) 
-> Self {
+        self.labels.push((key.into(), value.into()));
         self
     }
 
     fn add_labels<K, V, I>(mut self, tags: I) -> Self
     where
-        K: ToString,
-        V: ToString,
+        K: Into<String>,
+        V: Into<String>,
         I: IntoIterator<Item = (K, V)>,
     {
-        self.labels.extend(
-            tags.into_iter()
-                .map(|(k, v)| (k.to_string(), v.to_string())),
-        );
+        self.labels
+            .extend(tags.into_iter().map(|(k, v)| (k.into(), v.into())));
         self
     }
 }
@@ -86,10 +84,10 @@ pub struct Counter {
 
 impl Counter {
     #[inline]
-    pub fn new(name: impl ToString) -> Self {
+    pub fn new(name: impl Into<String>) -> Self {
         Self {
             id: MeterId {
-                name: name.to_string(),
+                name: name.into(),
                 typ: MeterType::Counter,
                 labels: vec![],
             },
@@ -100,7 +98,7 @@ impl Counter {
     }
 
     #[inline]
-    pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> 
Self {
+    pub fn add_label(mut self, key: impl Into<String>, value: impl 
Into<String>) -> Self {
         self.id = self.id.add_label(key, value);
         self
     }
@@ -108,8 +106,8 @@ impl Counter {
     #[inline]
     pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
     where
-        K: ToString,
-        V: ToString,
+        K: Into<String>,
+        V: Into<String>,
         I: IntoIterator<Item = (K, V)>,
     {
         self.id = self.id.add_labels(tags);
@@ -173,10 +171,10 @@ pub struct Gauge<G> {
 
 impl<G: Fn() -> f64> Gauge<G> {
     #[inline]
-    pub fn new(name: impl ToString, getter: G) -> Self {
+    pub fn new(name: impl Into<String>, getter: G) -> Self {
         Self {
             id: MeterId {
-                name: name.to_string(),
+                name: name.into(),
                 typ: MeterType::Gauge,
                 labels: vec![],
             },
@@ -185,7 +183,7 @@ impl<G: Fn() -> f64> Gauge<G> {
     }
 
     #[inline]
-    pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> 
Self {
+    pub fn add_label(mut self, key: impl Into<String>, value: impl 
Into<String>) -> Self {
         self.id = self.id.add_label(key, value);
         self
     }
@@ -193,8 +191,8 @@ impl<G: Fn() -> f64> Gauge<G> {
     #[inline]
     pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
     where
-        K: ToString,
-        V: ToString,
+        K: Into<String>,
+        V: Into<String>,
         I: IntoIterator<Item = (K, V)>,
     {
         self.id = self.id.add_labels(tags);
@@ -253,10 +251,10 @@ pub struct Histogram {
 }
 
 impl Histogram {
-    pub fn new(name: impl ToString, mut steps: Vec<f64>) -> Self {
+    pub fn new(name: impl Into<String>, mut steps: Vec<f64>) -> Self {
         Self {
             id: MeterId {
-                name: name.to_string(),
+                name: name.into(),
                 typ: MeterType::Histogram,
                 labels: vec![],
             },
@@ -269,7 +267,7 @@ impl Histogram {
     }
 
     #[inline]
-    pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> 
Self {
+    pub fn add_label(mut self, key: impl Into<String>, value: impl 
Into<String>) -> Self {
         self.id = self.id.add_label(key, value);
         self
     }
@@ -277,8 +275,8 @@ impl Histogram {
     #[inline]
     pub fn add_labels<K, V, I>(mut self, tags: I) -> Self
     where
-        K: ToString,
-        V: ToString,
+        K: Into<String>,
+        V: Into<String>,
         I: IntoIterator<Item = (K, V)>,
     {
         self.id = self.id.add_labels(tags);
diff --git a/src/metrics/metricer.rs b/src/metrics/metricer.rs
index 81b8351..ad8f3de 100644
--- a/src/metrics/metricer.rs
+++ b/src/metrics/metricer.rs
@@ -42,13 +42,13 @@ pub struct Metricer {
 impl Metricer {
     /// New with service info and reporter.
     pub fn new(
-        service_name: impl ToString,
-        instance_name: impl ToString,
+        service_name: impl Into<String>,
+        instance_name: impl Into<String>,
         reporter: impl Report + Send + Sync + 'static,
     ) -> Self {
         Self {
-            service_name: service_name.to_string(),
-            instance_name: instance_name.to_string(),
+            service_name: service_name.into(),
+            instance_name: instance_name.into(),
             reporter: Box::new(reporter),
             meter_map: Default::default(),
             report_interval: Duration::from_secs(20),
@@ -86,7 +86,7 @@ impl Metricer {
                     for trans in metricer_.meter_map.values() {
                         metricer_
                             .reporter
-                            
.report(CollectItem::Meter(trans.transform(&metricer_)));
+                            
.report(CollectItem::Meter(Box::new(trans.transform(&metricer_))));
                     }
                 })
                 .await;
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index fcea63d..5b0843a 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -14,6 +14,8 @@
 // limitations under the License.
 //
 
+#[cfg(feature = "management")]
+use 
crate::skywalking_proto::v3::management_service_client::ManagementServiceClient;
 use crate::{
     reporter::{CollectItem, Report},
     skywalking_proto::v3::{
@@ -104,6 +106,9 @@ struct Inner<P, C> {
     trace_client: Mutex<TraceSegmentReportServiceClient<Channel>>,
     log_client: Mutex<LogReportServiceClient<Channel>>,
     meter_client: Mutex<MeterReportServiceClient<Channel>>,
+    #[cfg(feature = "management")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
+    management_client: Mutex<ManagementServiceClient<Channel>>,
     producer: P,
     consumer: Mutex<Option<C>>,
     is_reporting: AtomicBool,
@@ -138,6 +143,8 @@ impl<P: CollectItemProduce, C: ColletcItemConsume> 
GrpcReporter<P, C> {
             inner: Arc::new(Inner {
                 trace_client: 
Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())),
                 log_client: 
Mutex::new(LogReportServiceClient::new(channel.clone())),
+                #[cfg(feature = "management")]
+                management_client: 
Mutex::new(ManagementServiceClient::new(channel.clone())),
                 meter_client: 
Mutex::new(MeterReportServiceClient::new(channel)),
                 producer,
                 consumer: Mutex::new(Some(consumer)),
@@ -216,13 +223,43 @@ impl<P: CollectItemProduce, C: ColletcItemConsume> 
ReporterAndBuffer<P, C> {
         // TODO Implement batch collect in future.
         match item {
             CollectItem::Trace(item) => {
-                self.trace_buffer.push_back(item);
+                self.trace_buffer.push_back(*item);
             }
             CollectItem::Log(item) => {
-                self.log_buffer.push_back(item);
+                self.log_buffer.push_back(*item);
             }
             CollectItem::Meter(item) => {
-                self.meter_buffer.push_back(item);
+                self.meter_buffer.push_back(*item);
+            }
+            #[cfg(feature = "management")]
+            CollectItem::Instance(item) => {
+                if let Err(e) = self
+                    .inner
+                    .management_client
+                    .lock()
+                    .await
+                    .report_instance_properties(*item)
+                    .await
+                {
+                    if let Some(status_handle) = &self.status_handle {
+                        status_handle(e);
+                    }
+                }
+            }
+            #[cfg(feature = "management")]
+            CollectItem::Ping(item) => {
+                if let Err(e) = self
+                    .inner
+                    .management_client
+                    .lock()
+                    .await
+                    .keep_alive(*item)
+                    .await
+                {
+                    if let Some(status_handle) = &self.status_handle {
+                        status_handle(e);
+                    }
+                }
             }
         }
 
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
index 79735bf..921cb02 100644
--- a/src/reporter/mod.rs
+++ b/src/reporter/mod.rs
@@ -17,6 +17,8 @@
 pub mod grpc;
 pub mod print;
 
+#[cfg(feature = "management")]
+use crate::skywalking_proto::v3::{InstancePingPkg, InstanceProperties};
 use crate::skywalking_proto::v3::{LogData, MeterData, SegmentObject};
 use serde::{Deserialize, Serialize};
 use std::{ops::Deref, sync::Arc};
@@ -25,9 +27,15 @@ use tokio::sync::OnceCell;
 #[derive(Debug, Serialize, Deserialize)]
 #[non_exhaustive]
 pub enum CollectItem {
-    Trace(SegmentObject),
-    Log(LogData),
-    Meter(MeterData),
+    Trace(Box<SegmentObject>),
+    Log(Box<LogData>),
+    Meter(Box<MeterData>),
+    #[cfg(feature = "management")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
+    Instance(Box<InstanceProperties>),
+    #[cfg(feature = "management")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
+    Ping(Box<InstancePingPkg>),
 }
 
 pub(crate) type DynReport = dyn Report + Send + Sync + 'static;
diff --git a/src/reporter/print.rs b/src/reporter/print.rs
index e5b640f..6124c46 100644
--- a/src/reporter/print.rs
+++ b/src/reporter/print.rs
@@ -57,6 +57,22 @@ impl Report for PrintReporter {
                     println!("meter data={:?}", data);
                 }
             }
+            #[cfg(feature = "management")]
+            CollectItem::Instance(data) => {
+                if self.use_stderr {
+                    eprintln!("instance properties data={:?}", data);
+                } else {
+                    println!("instance properties data={:?}", data);
+                }
+            }
+            #[cfg(feature = "management")]
+            CollectItem::Ping(data) => {
+                if self.use_stderr {
+                    eprintln!("ping data={:?}", data);
+                } else {
+                    println!("ping data={:?}", data);
+                }
+            }
         }
     }
 }
diff --git a/src/skywalking_proto/v3/mod.rs b/src/skywalking_proto/v3/mod.rs
index 8623eaa..dc74965 100644
--- a/src/skywalking_proto/v3/mod.rs
+++ b/src/skywalking_proto/v3/mod.rs
@@ -21,8 +21,8 @@ impl SpanObject {
     /// Add logs to the span.
     pub fn add_log<K, V, I>(&mut self, message: I)
     where
-        K: ToString,
-        V: ToString,
+        K: Into<String>,
+        V: Into<String>,
         I: IntoIterator<Item = (K, V)>,
     {
         let log = Log {
@@ -32,8 +32,8 @@ impl SpanObject {
                 .map(|v| {
                     let (key, value) = v;
                     KeyStringValuePair {
-                        key: key.to_string(),
-                        value: value.to_string(),
+                        key: key.into(),
+                        value: value.into(),
                     }
                 })
                 .collect(),
@@ -42,10 +42,10 @@ impl SpanObject {
     }
 
     /// Add tag to the span.
-    pub fn add_tag(&mut self, key: impl ToString, value: impl ToString) {
+    pub fn add_tag(&mut self, key: impl Into<String>, value: impl 
Into<String>) {
         self.tags.push(KeyStringValuePair {
-            key: key.to_string(),
-            value: value.to_string(),
+            key: key.into(),
+            value: value.into(),
         });
     }
 }
diff --git a/src/trace/span.rs b/src/trace/span.rs
index ce49c6b..84d97fa 100644
--- a/src/trace/span.rs
+++ b/src/trace/span.rs
@@ -141,15 +141,15 @@ impl Span {
     /// Add logs to the span.
     pub fn add_log<K, V, I>(&mut self, message: I)
     where
-        K: ToString,
-        V: ToString,
+        K: Into<String>,
+        V: Into<String>,
         I: IntoIterator<Item = (K, V)>,
     {
         self.with_span_object_mut(|span| span.add_log(message))
     }
 
     /// Add tag to the span.
-    pub fn add_tag(&mut self, key: impl ToString, value: impl ToString) {
+    pub fn add_tag(&mut self, key: impl Into<String>, value: impl 
Into<String>) {
         self.with_span_object_mut(|span| span.add_tag(key, value))
     }
 }
diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs
index 659b3de..d0daa94 100644
--- a/src/trace/trace_context.rs
+++ b/src/trace/trace_context.rs
@@ -119,15 +119,15 @@ impl std::fmt::Debug for TracingContext {
 impl TracingContext {
     /// Generate a new trace context.
     pub(crate) fn new(
-        service_name: impl ToString,
-        instance_name: impl ToString,
+        service_name: impl Into<String>,
+        instance_name: impl Into<String>,
         tracer: WeakTracer,
     ) -> Self {
         TracingContext {
             trace_id: RandomGenerator::generate(),
             trace_segment_id: RandomGenerator::generate(),
-            service: service_name.to_string(),
-            service_instance: instance_name.to_string(),
+            service: service_name.into(),
+            service_instance: instance_name.into(),
             next_span_id: Default::default(),
             span_stack: Default::default(),
             primary_endpoint_name: Default::default(),
diff --git a/src/trace/tracer.rs b/src/trace/tracer.rs
index 963d270..599ec95 100644
--- a/src/trace/tracer.rs
+++ b/src/trace/tracer.rs
@@ -55,14 +55,14 @@ pub struct Tracer {
 impl Tracer {
     /// New with service info and reporter.
     pub fn new(
-        service_name: impl ToString,
-        instance_name: impl ToString,
+        service_name: impl Into<String>,
+        instance_name: impl Into<String>,
         reporter: impl Report + Send + Sync + 'static,
     ) -> Self {
         Self {
             inner: Arc::new(Inner {
-                service_name: service_name.to_string(),
-                instance_name: instance_name.to_string(),
+                service_name: service_name.into(),
+                instance_name: instance_name.into(),
                 reporter: Box::new(reporter),
             }),
         }
@@ -90,7 +90,7 @@ impl Tracer {
         let segment_object = context.convert_to_segment_object();
         self.inner
             .reporter
-            .report(CollectItem::Trace(segment_object));
+            .report(CollectItem::Trace(Box::new(segment_object)));
     }
 
     fn downgrade(&self) -> WeakTracer {
diff --git a/tests/logging.rs b/tests/logging.rs
index e909bf6..53483d2 100644
--- a/tests/logging.rs
+++ b/tests/logging.rs
@@ -161,7 +161,7 @@ impl Report for MockReporter {
     fn report(&self, item: CollectItem) {
         match item {
             CollectItem::Log(data) => {
-                self.items.try_lock().unwrap().push_back(data);
+                self.items.try_lock().unwrap().push_back(*data);
             }
             _ => {}
         }
diff --git a/tests/management.rs b/tests/management.rs
new file mode 100644
index 0000000..5b10419
--- /dev/null
+++ b/tests/management.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use skywalking::{
+    management::{instance::Properties, manager::Manager},
+    reporter::{CollectItem, Report},
+    skywalking_proto::v3::{InstancePingPkg, InstanceProperties, 
KeyStringValuePair},
+};
+use std::{
+    collections::LinkedList,
+    process,
+    sync::{Arc, Mutex},
+    time::Duration,
+};
+use tokio::time::sleep;
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+async fn management() {
+    let reporter = Arc::new(MockReporter::default());
+    let manager = Manager::new("service_name", "instance_name", 
reporter.clone());
+    manager.keep_alive(Duration::from_secs(60));
+
+    {
+        let mut props = Properties::new();
+        props.insert_os_info();
+        manager.report_properties(props);
+
+        let actual_props = reporter.pop_ins_props();
+        assert_eq!(actual_props.service, "service_name".to_owned());
+        assert_eq!(actual_props.service_instance, "instance_name".to_owned());
+        assert_eq!(
+            kvs_get_value(&actual_props.properties, Properties::KEY_LANGUAGE),
+            "rust"
+        );
+        assert_eq!(
+            kvs_get_value(&actual_props.properties, Properties::KEY_HOST_NAME),
+            hostname::get().unwrap()
+        );
+        assert_eq!(
+            kvs_get_value(&actual_props.properties, 
Properties::KEY_PROCESS_NO),
+            process::id().to_string()
+        );
+    }
+
+    {
+        sleep(Duration::from_secs(1)).await;
+        assert_eq!(
+            reporter.pop_ping(),
+            InstancePingPkg {
+                service: "service_name".to_owned(),
+                service_instance: "instance_name".to_owned(),
+                ..Default::default()
+            }
+        );
+    }
+}
+
+fn kvs_get_value<'a>(kvs: &'a [KeyStringValuePair], key: &str) -> &'a str {
+    &kvs.iter().find(|kv| kv.key == key).unwrap().value
+}
+
+#[derive(Default, Clone)]
+struct MockReporter {
+    props_items: Arc<Mutex<LinkedList<InstanceProperties>>>,
+    ping_items: Arc<Mutex<LinkedList<InstancePingPkg>>>,
+}
+
+impl MockReporter {
+    fn pop_ins_props(&self) -> InstanceProperties {
+        self.props_items.try_lock().unwrap().pop_back().unwrap()
+    }
+
+    fn pop_ping(&self) -> InstancePingPkg {
+        self.ping_items.try_lock().unwrap().pop_back().unwrap()
+    }
+}
+
+impl Report for MockReporter {
+    fn report(&self, item: CollectItem) {
+        match item {
+            CollectItem::Instance(data) => {
+                self.props_items.try_lock().unwrap().push_back(*data);
+            }
+            CollectItem::Ping(data) => {
+                self.ping_items.try_lock().unwrap().push_back(*data);
+            }
+            _ => {}
+        }
+    }
+}
diff --git a/tests/metrics.rs b/tests/metrics.rs
index f1d95d0..43246ec 100644
--- a/tests/metrics.rs
+++ b/tests/metrics.rs
@@ -166,7 +166,7 @@ impl Report for MockReporter {
     fn report(&self, item: CollectItem) {
         match item {
             CollectItem::Meter(data) => {
-                self.items.try_lock().unwrap().push_back(data);
+                self.items.try_lock().unwrap().push_back(*data);
             }
             _ => {}
         }
diff --git a/tests/trace_context.rs b/tests/trace_context.rs
index b967e53..4603ad9 100644
--- a/tests/trace_context.rs
+++ b/tests/trace_context.rs
@@ -387,6 +387,6 @@ impl Report for MockReporter {
             CollectItem::Trace(segment) => segment,
             _ => unreachable!(),
         };
-        self.segments.try_lock().unwrap().push_back(segment);
+        self.segments.try_lock().unwrap().push_back(*segment);
     }
 }

Reply via email to