This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-php.git
The following commit(s) were added to refs/heads/master by this push:
new f433815 Refactor worker to standalone crate (#118)
f433815 is described below
commit f43381509bf84674dc4b9b3ebde4404cb3f9e8ec
Author: jmjoy <[email protected]>
AuthorDate: Thu Aug 15 15:30:54 2024 +0800
Refactor worker to standalone crate (#118)
---
.licenserc.yaml | 1 +
Cargo.lock | 47 ++++--
Cargo.toml | 19 ++-
config.m4 | 1 +
dist-material/LICENSE | 11 +-
scripts/Cargo.toml | 10 +-
src/channel.rs | 24 ---
src/lib.rs | 1 -
src/util.rs | 12 --
src/worker.rs | 222 ++++++-------------------
Cargo.toml => worker/Cargo.toml | 48 ++----
{src => worker/src}/channel.rs | 56 +------
src/worker.rs => worker/src/lib.rs | 126 ++++++--------
{src => worker/src}/reporter/mod.rs | 26 ++-
{src => worker/src}/reporter/reporter_grpc.rs | 51 +++---
{src => worker/src}/reporter/reporter_kafka.rs | 13 +-
16 files changed, 230 insertions(+), 438 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 222a72e..9d5fdd4 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -32,6 +32,7 @@ header:
- 'config.m4'
- 'vendor'
- 'dist-material'
+ - 'target'
comment: on-failure
diff --git a/Cargo.lock b/Cargo.lock
index 5e4ce47..8a6988f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -82,9 +82,9 @@ dependencies = [
[[package]]
name = "anstyle-wincon"
-version = "1.0.1"
+version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188"
+checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c"
dependencies = [
"anstyle",
"windows-sys 0.48.0",
@@ -365,9 +365,9 @@ dependencies = [
[[package]]
name = "clap"
-version = "4.3.19"
+version = "4.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d"
+checksum = "fb690e81c7840c0d7aade59f242ea3b41b9bc27bcd5997890e7702ae4b32e487"
dependencies = [
"clap_builder",
"clap_derive",
@@ -376,9 +376,9 @@ dependencies = [
[[package]]
name = "clap_builder"
-version = "4.3.19"
+version = "4.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1"
+checksum = "5ed2e96bc16d8d740f6f48d663eddf4b8a0983e79210fd55479b7bcd0a69860e"
dependencies = [
"anstream",
"anstyle",
@@ -983,13 +983,13 @@ checksum =
"28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6"
[[package]]
name = "is-terminal"
-version = "0.4.9"
+version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
+checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b"
dependencies = [
"hermit-abi",
- "rustix",
- "windows-sys 0.48.0",
+ "libc",
+ "windows-sys 0.52.0",
]
[[package]]
@@ -1597,9 +1597,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.66"
+version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
+checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
dependencies = [
"unicode-ident",
]
@@ -2173,8 +2173,8 @@ dependencies = [
"prost",
"rdkafka",
"reqwest",
- "serde_json",
"skywalking",
+ "skywalking-php-worker",
"systemstat",
"thiserror",
"tokio",
@@ -2185,6 +2185,27 @@ dependencies = [
"url",
]
+[[package]]
+name = "skywalking-php-worker"
+version = "0.8.0-dev"
+dependencies = [
+ "anyhow",
+ "bincode",
+ "clap",
+ "clap_lex",
+ "libc",
+ "once_cell",
+ "prost",
+ "rdkafka",
+ "serde_json",
+ "skywalking",
+ "tokio",
+ "tokio-stream",
+ "tonic",
+ "tracing",
+ "tracing-subscriber",
+]
+
[[package]]
name = "slab"
version = "0.4.8"
diff --git a/Cargo.toml b/Cargo.toml
index 3af549e..a7cbf86 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,17 +17,26 @@
members = [
".",
"scripts",
+ "worker",
]
-[package]
-name = "skywalking-php"
+[workspace.package]
version = "0.8.0-dev"
authors = ["Apache Software Foundation", "jmjoy <[email protected]>", "Yanlong
He <[email protected]>"]
-description = "Apache SkyWalking PHP Agent."
edition = "2021"
rust-version = "1.65"
repository = "https://github.com/apache/skywalking-php"
license = "Apache-2.0"
+
+[package]
+name = "skywalking-php"
+version = { workspace = true }
+authors = { workspace = true }
+description = "Apache SkyWalking PHP Agent."
+edition = { workspace = true }
+rust-version = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
readme = "README.md"
publish = false
@@ -36,7 +45,7 @@ name = "skywalking_agent"
crate-type = ["lib", "cdylib"]
[features]
-kafka-reporter = ["skywalking/kafka-reporter", "rdkafka/sasl"]
+kafka-reporter = ["skywalking-php-worker/kafka-reporter"]
[dependencies]
anyhow = { version = "1.0.72", features = ["backtrace"] }
@@ -49,8 +58,8 @@ once_cell = "1.18.0"
phper = "0.12.0"
prost = "0.11.9"
rdkafka = { version = "0.32.2", optional = true }
-serde_json = { version = "1.0.104", features = ["preserve_order"] }
skywalking = { version = "0.8.0", features = ["management"] }
+skywalking-php-worker = { path = "worker" }
systemstat = "0.2.3"
thiserror = "1.0.44"
tokio = { version = "1.29.1", features = ["full"] }
diff --git a/config.m4 b/config.m4
index edd4c0e..e60d10f 100644
--- a/config.m4
+++ b/config.m4
@@ -85,5 +85,6 @@ EOF
scripts:scripts \
src:src \
tests:tests \
+ worker:worker \
])
fi
diff --git a/dist-material/LICENSE b/dist-material/LICENSE
index 4a27d97..c2f79a1 100644
--- a/dist-material/LICENSE
+++ b/dist-material/LICENSE
@@ -224,6 +224,7 @@ The text of each license is the standard Apache 2.0 license.
https://crates.io/crates/sasl2-sys/0.1.22+2.1.28 0.1.22+2.1.28 Apache-2.0
https://crates.io/crates/scripts/0.0.0 0.0.0 Apache-2.0
https://crates.io/crates/skywalking/0.8.0 0.8.0 Apache-2.0
+ https://crates.io/crates/skywalking-php-worker/0.8.0-dev 0.8.0-dev
Apache-2.0
https://crates.io/crates/sync_wrapper/0.1.2 0.1.2 Apache-2.0
========================================================================
@@ -277,7 +278,7 @@ The text of each license is also included in
licenses/LICENSE-[project].txt.
https://crates.io/crates/anstyle/1.0.1 1.0.1 Apache-2.0 OR MIT
https://crates.io/crates/anstyle-parse/0.2.1 0.2.1 Apache-2.0 OR MIT
https://crates.io/crates/anstyle-query/1.0.0 1.0.0 Apache-2.0 OR MIT
- https://crates.io/crates/anstyle-wincon/1.0.1 1.0.1 Apache-2.0 OR MIT
+ https://crates.io/crates/anstyle-wincon/1.0.2 1.0.2 Apache-2.0 OR MIT
https://crates.io/crates/anyhow/1.0.72 1.0.72 Apache-2.0 OR MIT
https://crates.io/crates/async-trait/0.1.72 0.1.72 Apache-2.0 OR MIT
https://crates.io/crates/autocfg/1.1.0 1.1.0 Apache-2.0 OR MIT
@@ -295,8 +296,8 @@ The text of each license is also included in
licenses/LICENSE-[project].txt.
https://crates.io/crates/chrono/0.4.26 0.4.26 Apache-2.0 OR MIT
https://crates.io/crates/chrono-tz/0.6.1 0.6.1 Apache-2.0 OR MIT
https://crates.io/crates/chrono-tz-build/0.0.2 0.0.2 Apache-2.0 OR MIT
- https://crates.io/crates/clap/4.3.19 4.3.19 Apache-2.0 OR MIT
- https://crates.io/crates/clap_builder/4.3.19 4.3.19 Apache-2.0 OR MIT
+ https://crates.io/crates/clap/4.3.24 4.3.24 Apache-2.0 OR MIT
+ https://crates.io/crates/clap_builder/4.3.24 4.3.24 Apache-2.0 OR MIT
https://crates.io/crates/clap_derive/4.3.12 4.3.12 Apache-2.0 OR MIT
https://crates.io/crates/clap_lex/0.5.0 0.5.0 Apache-2.0 OR MIT
https://crates.io/crates/colorchoice/1.0.0 1.0.0 Apache-2.0 OR MIT
@@ -385,7 +386,7 @@ The text of each license is also included in
licenses/LICENSE-[project].txt.
https://crates.io/crates/ppv-lite86/0.2.17 0.2.17 Apache-2.0 OR MIT
https://crates.io/crates/prettyplease/0.1.25 0.1.25 Apache-2.0 OR MIT
https://crates.io/crates/proc-macro-crate/1.3.1 1.3.1 Apache-2.0 OR MIT
- https://crates.io/crates/proc-macro2/1.0.66 1.0.66 Apache-2.0 OR MIT
+ https://crates.io/crates/proc-macro2/1.0.86 1.0.86 Apache-2.0 OR MIT
https://crates.io/crates/quick-error/1.2.3 1.2.3 Apache-2.0 OR MIT
https://crates.io/crates/quote/1.0.32 1.0.32 Apache-2.0 OR MIT
https://crates.io/crates/rand/0.8.5 0.8.5 Apache-2.0 OR MIT
@@ -542,7 +543,7 @@ The text of each license is also included in
licenses/LICENSE-[project].txt.
https://crates.io/crates/hostname/0.3.1 0.3.1 MIT
https://crates.io/crates/http-body/0.4.5 0.4.5 MIT
https://crates.io/crates/hyper/0.14.27 0.14.27 MIT
- https://crates.io/crates/is-terminal/0.4.9 0.4.9 MIT
+ https://crates.io/crates/is-terminal/0.4.12 0.4.12 MIT
https://crates.io/crates/matchers/0.1.0 0.1.0 MIT
https://crates.io/crates/matches/0.1.10 0.1.10 MIT
https://crates.io/crates/mio/0.8.8 0.8.8 MIT
diff --git a/scripts/Cargo.toml b/scripts/Cargo.toml
index 8e0aeb7..30dec9a 100644
--- a/scripts/Cargo.toml
+++ b/scripts/Cargo.toml
@@ -16,12 +16,12 @@
[package]
name = "scripts"
version = "0.0.0"
-authors = ["Apache Software Foundation", "jmjoy <[email protected]>", "Yanlong
He <[email protected]>"]
+authors = { workspace = true }
description = "The Scripts of Apache SkyWalking PHP Agent."
-edition = "2021"
-rust-version = "1.58"
-repository = "https://github.com/apache/skywalking-php"
-license = "Apache-2.0"
+edition = { workspace = true }
+rust-version = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
publish = false
[dependencies]
diff --git a/src/channel.rs b/src/channel.rs
index e29617b..8976f86 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -18,13 +18,11 @@ use once_cell::sync::OnceCell;
use skywalking::reporter::{CollectItem, Report};
use std::{
io::Write,
- mem::size_of,
ops::DerefMut,
os::unix::net::UnixStream,
path::{Path, PathBuf},
sync::Mutex,
};
-use tokio::{io::AsyncReadExt, sync::mpsc};
use tracing::error;
fn channel_send<T>(data: CollectItem, mut sender: T) -> anyhow::Result<()>
@@ -40,18 +38,6 @@ where
Ok(())
}
-pub async fn channel_receive(receiver: &mut tokio::net::UnixStream) ->
anyhow::Result<CollectItem> {
- let mut size_buf = [0u8; size_of::<usize>()];
- receiver.read_exact(&mut size_buf).await?;
- let size = usize::from_le_bytes(size_buf);
-
- let mut content = vec![0u8; size];
- receiver.read_exact(&mut content).await?;
-
- let item = bincode::deserialize(&content)?;
- Ok(item)
-}
-
pub struct Reporter {
worker_addr: PathBuf,
stream: OnceCell<Mutex<UnixStream>>,
@@ -83,13 +69,3 @@ impl Report for Reporter {
}
}
}
-
-pub struct TxReporter(pub mpsc::Sender<CollectItem>);
-
-impl Report for TxReporter {
- fn report(&self, item: CollectItem) {
- if let Err(err) = self.0.try_send(item) {
- error!(?err, "Send collect item failed");
- }
- }
-}
diff --git a/src/lib.rs b/src/lib.rs
index a65f4ef..5a54bb9 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -24,7 +24,6 @@ mod errors;
mod execute;
mod module;
mod plugin;
-mod reporter;
mod request;
mod tag;
mod util;
diff --git a/src/util.rs b/src/util.rs
index e9fb1ca..6ad8bd8 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -18,9 +18,7 @@ use once_cell::sync::Lazy;
use phper::{ini::ini_get, sys, values::ZVal};
use std::{
ffi::CStr,
- os::unix::prelude::OsStrExt,
panic::{catch_unwind, UnwindSafe},
- path::Path,
};
use systemstat::{IpAddr, Platform, System};
@@ -87,16 +85,6 @@ pub fn get_sapi_module_name() -> &'static CStr {
unsafe { CStr::from_ptr(sys::sapi_module.name) }
}
-pub fn change_permission(f: impl AsRef<Path>, mode: libc::mode_t) {
- let f = f.as_ref().as_os_str().as_bytes();
- let mut path = Vec::with_capacity(f.len() + 1);
- path.extend_from_slice(f);
- path.push(b'\0');
- unsafe {
- libc::chmod(path.as_ptr().cast(), mode);
- }
-}
-
pub fn get_str_ini_with_default(name: &str) -> String {
ini_get::<Option<&CStr>>(name)
.and_then(|s| s.to_str().ok())
diff --git a/src/worker.rs b/src/worker.rs
index 80c74a7..dbeaa60 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -13,39 +13,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use crate::{
- channel::{self, TxReporter},
- module::{
- HEARTBEAT_PERIOD, PROPERTIES_REPORT_PERIOD_FACTOR, SERVICE_INSTANCE,
SERVICE_NAME,
- SOCKET_FILE_PATH, WORKER_THREADS,
- },
- reporter::run_reporter,
- util::change_permission,
+use crate::module::{
+ AUTHENTICATION, ENABLE_TLS, HEARTBEAT_PERIOD,
PROPERTIES_REPORT_PERIOD_FACTOR, REPORTER_TYPE,
+ SERVER_ADDR, SERVICE_INSTANCE, SERVICE_NAME, SOCKET_FILE_PATH,
SSL_CERT_CHAIN_PATH,
+ SSL_KEY_PATH, SSL_TRUSTED_CA_PATH, WORKER_THREADS,
};
-
-use once_cell::sync::Lazy;
-
-use skywalking::{
- management::{instance::Properties, manager::Manager},
- reporter::{CollectItem, CollectItemConsume},
-};
-use std::{
- cmp::Ordering, error::Error, fs, io, marker::PhantomData,
num::NonZeroUsize, process::exit,
- thread::available_parallelism, time::Duration,
-};
-use tokio::{
- net::UnixListener,
- runtime::{self, Runtime},
- select,
- signal::unix::{signal, SignalKind},
- sync::mpsc::{self, error::TrySendError},
+#[cfg(feature = "kafka-reporter")]
+use crate::module::{KAFKA_BOOTSTRAP_SERVERS, KAFKA_PRODUCER_CONFIG};
+#[cfg(feature = "kafka-reporter")]
+use skywalking_php_worker::reporter::KafkaReporterConfiguration;
+use skywalking_php_worker::{
+ new_tokio_runtime,
+ reporter::{GrpcReporterConfiguration, ReporterConfiguration},
+ start_worker, HeartBeatConfiguration, WorkerConfiguration,
};
-use tonic::async_trait;
-use tracing::{debug, error, info, warn};
+use std::{cmp::Ordering, num::NonZeroUsize, process::exit,
thread::available_parallelism};
+use tracing::error;
pub fn init_worker() {
- let worker_threads = worker_threads();
-
unsafe {
// TODO Shutdown previous worker before fork if there is a PHP-FPM
reload
// operation.
@@ -61,9 +46,40 @@ pub fn init_worker() {
#[cfg(target_os = "linux")]
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);
+ let reporter_config = match REPORTER_TYPE.as_str() {
+ "grpc" =>
ReporterConfiguration::Grpc(GrpcReporterConfiguration {
+ authentication: AUTHENTICATION.clone(),
+ enable_tls: *ENABLE_TLS,
+ server_addr: SERVER_ADDR.clone(),
+ ssl_cert_chain_path: SSL_CERT_CHAIN_PATH.clone(),
+ ssl_key_path: SSL_KEY_PATH.clone(),
+ ssl_trusted_ca_path: SSL_TRUSTED_CA_PATH.clone(),
+ }),
+ #[cfg(feature = "kafka-reporter")]
+ "kafka" =>
ReporterConfiguration::Kafka(KafkaReporterConfiguration {
+ kafka_bootstrap_servers:
KAFKA_BOOTSTRAP_SERVERS.clone(),
+ kafka_producer_config: KAFKA_PRODUCER_CONFIG.clone(),
+ }),
+ typ => {
+ error!("unknown reporter type, {}", typ);
+ exit(1);
+ }
+ };
+
+ let config = WorkerConfiguration {
+ socket_file_path: SOCKET_FILE_PATH.to_path_buf(),
+ heart_beat: Some(HeartBeatConfiguration {
+ service_instance: SERVICE_INSTANCE.clone(),
+ service_name: SERVICE_NAME.clone(),
+ heartbeat_period: *HEARTBEAT_PERIOD,
+ properties_report_period_factor:
*PROPERTIES_REPORT_PERIOD_FACTOR,
+ }),
+ reporter_config,
+ };
+
// Run the worker in subprocess.
- let rt = new_tokio_runtime(worker_threads);
- match rt.block_on(start_worker()) {
+ let rt = new_tokio_runtime(worker_threads());
+ match rt.block_on(start_worker(config)) {
Ok(_) => {
exit(0);
}
@@ -86,147 +102,3 @@ fn worker_threads() -> usize {
worker_threads as usize
}
}
-
-fn new_tokio_runtime(worker_threads: usize) -> Runtime {
- runtime::Builder::new_multi_thread()
- .thread_name("sw: worker")
- .enable_all()
- .worker_threads(worker_threads)
- .build()
- .unwrap()
-}
-
-async fn start_worker() -> anyhow::Result<()> {
- debug!("Starting worker...");
-
- // Ensure to cleanup resources when worker exits.
- let _guard = WorkerExitGuard::default();
-
- // Graceful shutdown signal, put it on the top of program.
- let mut sig_term = signal(SignalKind::terminate())?;
- let mut sig_int = signal(SignalKind::interrupt())?;
-
- let socket_file = &*SOCKET_FILE_PATH;
-
- let fut = async move {
- debug!(?socket_file, "Bind unix stream");
- let listener = UnixListener::bind(socket_file)?;
- change_permission(socket_file, 0o777);
-
- let (tx, rx) = mpsc::channel::<CollectItem>(255);
- let tx_ = tx.clone();
- tokio::spawn(async move {
- loop {
- match listener.accept().await {
- Ok((mut stream, _addr)) => {
- let tx = tx.clone();
-
- tokio::spawn(async move {
- debug!("Entering channel_receive loop");
-
- loop {
- let r = match channel::channel_receive(&mut
stream).await {
- Err(err) => match
err.downcast_ref::<io::Error>() {
- Some(e) if e.kind() ==
io::ErrorKind::UnexpectedEof => {
- debug!("Leaving channel_receive
loop");
- return;
- }
- _ => {
- error!(?err, "channel_receive
failed");
- continue;
- }
- },
- Ok(i) => i,
- };
-
- // Try send here, to prevent the ipc blocking
caused by the channel
- // bursting (too late to report),
- // which affects the pool process of php-fpm.
- if let Err(err) = tx.try_send(r) {
- error!(?err, "Send collect item failed");
- if !matches!(err, TrySendError::Full(_)) {
- return;
- }
- }
- }
- });
- }
- Err(err) => {
- error!(?err, "Accept failed");
- }
- }
- }
- });
-
- report_properties_and_keep_alive(TxReporter(tx_));
-
- // Run reporter with blocking.
- run_reporter((), Consumer(rx)).await?;
-
- Ok::<_, anyhow::Error>(())
- };
-
- // TODO Do graceful shutdown, and wait 10s then force quit.
- select! {
- _ = sig_term.recv() => {}
- _ = sig_int.recv() => {}
- r = fut => {
- r?;
- }
- }
-
- info!("Start to shutdown skywalking grpc reporter");
-
- Ok(())
-}
-
-struct Consumer(mpsc::Receiver<CollectItem>);
-
-#[async_trait]
-impl CollectItemConsume for Consumer {
- async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error +
Send>> {
- Ok(self.0.recv().await)
- }
-
- async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn
Error + Send>> {
- Ok(self.0.try_recv().ok())
- }
-}
-
-#[derive(Default)]
-struct WorkerExitGuard(PhantomData<()>);
-
-impl Drop for WorkerExitGuard {
- fn drop(&mut self) {
- match Lazy::get(&SOCKET_FILE_PATH) {
- Some(socket_file) => {
- info!(?socket_file, "Remove socket file");
- if let Err(err) = fs::remove_file(socket_file) {
- error!(?err, "Remove socket file failed");
- }
- }
- None => {
- warn!("Socket file not created");
- }
- }
- }
-}
-
-fn report_properties_and_keep_alive(reporter: TxReporter) {
- let manager = Manager::new(&*SERVICE_NAME, &*SERVICE_INSTANCE, reporter);
-
- manager.report_and_keep_alive(
- || {
- let mut props = Properties::new();
- props.insert_os_info();
- props.update(Properties::KEY_LANGUAGE, "php");
- props.update(Properties::KEY_PROCESS_NO, unsafe {
- libc::getppid().to_string()
- });
- debug!(?props, "Report instance properties");
- props
- },
- Duration::from_secs(*HEARTBEAT_PERIOD as u64),
- *PROPERTIES_REPORT_PERIOD_FACTOR as usize,
- );
-}
diff --git a/Cargo.toml b/worker/Cargo.toml
similarity index 66%
copy from Cargo.toml
copy to worker/Cargo.toml
index 3af549e..2780dc9 100644
--- a/Cargo.toml
+++ b/worker/Cargo.toml
@@ -13,57 +13,39 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-[workspace]
-members = [
- ".",
- "scripts",
-]
-
[package]
-name = "skywalking-php"
-version = "0.8.0-dev"
-authors = ["Apache Software Foundation", "jmjoy <[email protected]>", "Yanlong
He <[email protected]>"]
-description = "Apache SkyWalking PHP Agent."
-edition = "2021"
-rust-version = "1.65"
-repository = "https://github.com/apache/skywalking-php"
-license = "Apache-2.0"
+name = "skywalking-php-worker"
+version = { workspace = true }
+authors = { workspace = true }
+description = "Apache SkyWalking PHP Worker."
+edition = { workspace = true }
+rust-version = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
readme = "README.md"
publish = false
-[lib]
-name = "skywalking_agent"
-crate-type = ["lib", "cdylib"]
-
[features]
+standalone = ["clap", "clap_lex", "tracing-subscriber"]
kafka-reporter = ["skywalking/kafka-reporter", "rdkafka/sasl"]
[dependencies]
anyhow = { version = "1.0.72", features = ["backtrace"] }
bincode = "1.3.3"
-dashmap = "5.5.0"
-futures-util = "0.3.28"
-hostname = "0.3.1"
+clap = { version = "=4.3.24", features = ["derive"], optional = true }
+clap_lex = { version = "=0.5.0", optional = true }
libc = "0.2.147"
once_cell = "1.18.0"
-phper = "0.12.0"
prost = "0.11.9"
rdkafka = { version = "0.32.2", optional = true }
serde_json = { version = "1.0.104", features = ["preserve_order"] }
skywalking = { version = "0.8.0", features = ["management"] }
-systemstat = "0.2.3"
-thiserror = "1.0.44"
tokio = { version = "1.29.1", features = ["full"] }
tokio-stream = "0.1.14"
tonic = { version = "0.8.3", features = ["tls", "tls-roots"] }
tracing = { version = "0.1.37", features = ["attributes"] }
-tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
-url = "2.4.0"
-
-[dev-dependencies]
-axum = "0.6.19"
-fastcgi-client = "0.9.0"
-reqwest = { version = "0.11.18", features = ["trust-dns", "json", "stream"] }
+tracing-subscriber = { version = "0.3.17", features = ["env-filter"], optional
= true }
-[build-dependencies]
-phper-build = "0.12.0"
+# [[bin]]
+# name = "skywalking-php-worker"
+# required-features = ["standalone", "kafka-reporter"]
diff --git a/src/channel.rs b/worker/src/channel.rs
similarity index 54%
copy from src/channel.rs
copy to worker/src/channel.rs
index e29617b..6dcdd2a 100644
--- a/src/channel.rs
+++ b/worker/src/channel.rs
@@ -13,33 +13,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use anyhow::anyhow;
-use once_cell::sync::OnceCell;
use skywalking::reporter::{CollectItem, Report};
-use std::{
- io::Write,
- mem::size_of,
- ops::DerefMut,
- os::unix::net::UnixStream,
- path::{Path, PathBuf},
- sync::Mutex,
-};
+use std::mem::size_of;
use tokio::{io::AsyncReadExt, sync::mpsc};
use tracing::error;
-fn channel_send<T>(data: CollectItem, mut sender: T) -> anyhow::Result<()>
-where
- T: DerefMut<Target = UnixStream>,
-{
- let content = bincode::serialize(&data)?;
-
- sender.write_all(&content.len().to_le_bytes())?;
- sender.write_all(&content)?;
- sender.flush()?;
-
- Ok(())
-}
-
pub async fn channel_receive(receiver: &mut tokio::net::UnixStream) ->
anyhow::Result<CollectItem> {
let mut size_buf = [0u8; size_of::<usize>()];
receiver.read_exact(&mut size_buf).await?;
@@ -52,38 +30,6 @@ pub async fn channel_receive(receiver: &mut
tokio::net::UnixStream) -> anyhow::R
Ok(item)
}
-pub struct Reporter {
- worker_addr: PathBuf,
- stream: OnceCell<Mutex<UnixStream>>,
-}
-
-impl Reporter {
- pub fn new(worker_addr: impl AsRef<Path>) -> Self {
- Self {
- worker_addr: worker_addr.as_ref().to_path_buf(),
- stream: OnceCell::new(),
- }
- }
-
- fn try_report(&self, item: CollectItem) -> anyhow::Result<()> {
- let stream = self
- .stream
- .get_or_try_init(||
UnixStream::connect(&self.worker_addr).map(Mutex::new))?
- .lock()
- .map_err(|_| anyhow!("Get Lock failed"))?;
-
- channel_send(item, stream)
- }
-}
-
-impl Report for Reporter {
- fn report(&self, item: CollectItem) {
- if let Err(err) = self.try_report(item) {
- error!(?err, "channel send failed");
- }
- }
-}
-
pub struct TxReporter(pub mpsc::Sender<CollectItem>);
impl Report for TxReporter {
diff --git a/src/worker.rs b/worker/src/lib.rs
similarity index 64%
copy from src/worker.rs
copy to worker/src/lib.rs
index 80c74a7..0348456 100644
--- a/src/worker.rs
+++ b/worker/src/lib.rs
@@ -13,25 +13,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+pub mod channel;
+pub mod reporter;
+
use crate::{
- channel::{self, TxReporter},
- module::{
- HEARTBEAT_PERIOD, PROPERTIES_REPORT_PERIOD_FACTOR, SERVICE_INSTANCE,
SERVICE_NAME,
- SOCKET_FILE_PATH, WORKER_THREADS,
- },
- reporter::run_reporter,
- util::change_permission,
+ channel::TxReporter,
+ reporter::{run_reporter, ReporterConfiguration},
};
-
-use once_cell::sync::Lazy;
-
use skywalking::{
management::{instance::Properties, manager::Manager},
reporter::{CollectItem, CollectItemConsume},
};
use std::{
- cmp::Ordering, error::Error, fs, io, marker::PhantomData,
num::NonZeroUsize, process::exit,
- thread::available_parallelism, time::Duration,
+ error::Error,
+ fs, io,
+ os::unix::prelude::OsStrExt,
+ path::{Path, PathBuf},
+ time::Duration,
};
use tokio::{
net::UnixListener,
@@ -41,53 +39,22 @@ use tokio::{
sync::mpsc::{self, error::TrySendError},
};
use tonic::async_trait;
-use tracing::{debug, error, info, warn};
+use tracing::{debug, error, info};
-pub fn init_worker() {
- let worker_threads = worker_threads();
-
- unsafe {
- // TODO Shutdown previous worker before fork if there is a PHP-FPM
reload
- // operation.
- // TODO Change the worker process name.
-
- let pid = libc::fork();
- match pid.cmp(&0) {
- Ordering::Less => {
- error!("fork failed");
- }
- Ordering::Equal => {
- // Ensure worker process exits when master process exists.
- #[cfg(target_os = "linux")]
- libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);
-
- // Run the worker in subprocess.
- let rt = new_tokio_runtime(worker_threads);
- match rt.block_on(start_worker()) {
- Ok(_) => {
- exit(0);
- }
- Err(err) => {
- error!(?err, "worker exit unexpectedly");
- exit(1);
- }
- }
- }
- Ordering::Greater => {}
- }
- }
+pub struct WorkerConfiguration {
+ pub socket_file_path: PathBuf,
+ pub heart_beat: Option<HeartBeatConfiguration>,
+ pub reporter_config: ReporterConfiguration,
}
-fn worker_threads() -> usize {
- let worker_threads = *WORKER_THREADS;
- if worker_threads <= 0 {
- available_parallelism().map(NonZeroUsize::get).unwrap_or(1)
- } else {
- worker_threads as usize
- }
+pub struct HeartBeatConfiguration {
+ pub service_instance: String,
+ pub service_name: String,
+ pub heartbeat_period: i64,
+ pub properties_report_period_factor: i64,
}
-fn new_tokio_runtime(worker_threads: usize) -> Runtime {
+pub fn new_tokio_runtime(worker_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.thread_name("sw: worker")
.enable_all()
@@ -96,21 +63,21 @@ fn new_tokio_runtime(worker_threads: usize) -> Runtime {
.unwrap()
}
-async fn start_worker() -> anyhow::Result<()> {
+pub async fn start_worker(config: WorkerConfiguration) -> anyhow::Result<()> {
debug!("Starting worker...");
+ let socket_file = config.socket_file_path;
+
// Ensure to cleanup resources when worker exits.
- let _guard = WorkerExitGuard::default();
+ let _guard = WorkerExitGuard(socket_file.clone());
// Graceful shutdown signal, put it on the top of program.
let mut sig_term = signal(SignalKind::terminate())?;
let mut sig_int = signal(SignalKind::interrupt())?;
- let socket_file = &*SOCKET_FILE_PATH;
-
let fut = async move {
debug!(?socket_file, "Bind unix stream");
- let listener = UnixListener::bind(socket_file)?;
+ let listener = UnixListener::bind(&socket_file)?;
change_permission(socket_file, 0o777);
let (tx, rx) = mpsc::channel::<CollectItem>(255);
@@ -158,10 +125,12 @@ async fn start_worker() -> anyhow::Result<()> {
}
});
- report_properties_and_keep_alive(TxReporter(tx_));
+ if let Some(heart_beat_config) = config.heart_beat {
+ report_properties_and_keep_alive(heart_beat_config,
TxReporter(tx_));
+ }
// Run reporter with blocking.
- run_reporter((), Consumer(rx)).await?;
+ run_reporter(config.reporter_config, (), Consumer(rx)).await?;
Ok::<_, anyhow::Error>(())
};
@@ -193,27 +162,20 @@ impl CollectItemConsume for Consumer {
}
}
-#[derive(Default)]
-struct WorkerExitGuard(PhantomData<()>);
+struct WorkerExitGuard(PathBuf);
impl Drop for WorkerExitGuard {
fn drop(&mut self) {
- match Lazy::get(&SOCKET_FILE_PATH) {
- Some(socket_file) => {
- info!(?socket_file, "Remove socket file");
- if let Err(err) = fs::remove_file(socket_file) {
- error!(?err, "Remove socket file failed");
- }
- }
- None => {
- warn!("Socket file not created");
- }
+ let Self(ref socket_file) = self;
+ info!(?socket_file, "Remove socket file");
+ if let Err(err) = fs::remove_file(socket_file) {
+ error!(?err, "Remove socket file failed");
}
}
}
-fn report_properties_and_keep_alive(reporter: TxReporter) {
- let manager = Manager::new(&*SERVICE_NAME, &*SERVICE_INSTANCE, reporter);
+fn report_properties_and_keep_alive(config: HeartBeatConfiguration, reporter:
TxReporter) {
+ let manager = Manager::new(&*config.service_name,
&*config.service_instance, reporter);
manager.report_and_keep_alive(
|| {
@@ -226,7 +188,17 @@ fn report_properties_and_keep_alive(reporter: TxReporter) {
debug!(?props, "Report instance properties");
props
},
- Duration::from_secs(*HEARTBEAT_PERIOD as u64),
- *PROPERTIES_REPORT_PERIOD_FACTOR as usize,
+ Duration::from_secs(config.heartbeat_period as u64),
+ config.properties_report_period_factor as usize,
);
}
+
+fn change_permission(f: impl AsRef<Path>, mode: libc::mode_t) {
+ let f = f.as_ref().as_os_str().as_bytes();
+ let mut path = Vec::with_capacity(f.len() + 1);
+ path.extend_from_slice(f);
+ path.push(b'\0');
+ unsafe {
+ libc::chmod(path.as_ptr().cast(), mode);
+ }
+}
diff --git a/src/reporter/mod.rs b/worker/src/reporter/mod.rs
similarity index 59%
rename from src/reporter/mod.rs
rename to worker/src/reporter/mod.rs
index 7e50175..049c9fe 100644
--- a/src/reporter/mod.rs
+++ b/worker/src/reporter/mod.rs
@@ -16,17 +16,29 @@
mod reporter_grpc;
mod reporter_kafka;
-use crate::module::REPORTER_TYPE;
-use anyhow::bail;
+pub use reporter_grpc::GrpcReporterConfiguration;
+#[cfg(feature = "kafka-reporter")]
+pub use reporter_kafka::KafkaReporterConfiguration;
use skywalking::reporter::{CollectItemConsume, CollectItemProduce};
+pub enum ReporterConfiguration {
+ Grpc(GrpcReporterConfiguration),
+
+ #[cfg(feature = "kafka-reporter")]
+ Kafka(KafkaReporterConfiguration),
+}
+
pub async fn run_reporter(
- producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+ config: ReporterConfiguration, producer: impl CollectItemProduce,
+ consumer: impl CollectItemConsume,
) -> anyhow::Result<()> {
- match REPORTER_TYPE.as_str() {
- "grpc" => reporter_grpc::run_reporter(producer, consumer).await,
+ match config {
+ ReporterConfiguration::Grpc(config) => {
+ reporter_grpc::run_reporter(config, producer, consumer).await
+ }
#[cfg(feature = "kafka-reporter")]
- "kafka" => reporter_kafka::run_reporter(producer, consumer).await,
- typ => bail!("unknown reporter type, {}", typ),
+ ReporterConfiguration::Kafka(config) => {
+ reporter_kafka::run_reporter(config, producer, consumer).await
+ }
}
}
diff --git a/src/reporter/reporter_grpc.rs
b/worker/src/reporter/reporter_grpc.rs
similarity index 66%
rename from src/reporter/reporter_grpc.rs
rename to worker/src/reporter/reporter_grpc.rs
index 7948f57..d450b43 100644
--- a/src/reporter/reporter_grpc.rs
+++ b/worker/src/reporter/reporter_grpc.rs
@@ -13,9 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use crate::module::{
- AUTHENTICATION, ENABLE_TLS, SERVER_ADDR, SSL_CERT_CHAIN_PATH,
SSL_KEY_PATH, SSL_TRUSTED_CA_PATH,
-};
use anyhow::anyhow;
use skywalking::reporter::{grpc::GrpcReporter, CollectItemConsume,
CollectItemProduce};
use std::time::Duration;
@@ -23,16 +20,26 @@ use tokio::time::sleep;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint,
Identity};
use tracing::{debug, info, warn};
+pub struct GrpcReporterConfiguration {
+ pub authentication: String,
+ pub enable_tls: bool,
+ pub server_addr: String,
+ pub ssl_cert_chain_path: String,
+ pub ssl_key_path: String,
+ pub ssl_trusted_ca_path: String,
+}
+
pub async fn run_reporter(
- producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+ config: GrpcReporterConfiguration, producer: impl CollectItemProduce,
+ consumer: impl CollectItemConsume,
) -> anyhow::Result<()> {
- let endpoint = create_endpoint(&SERVER_ADDR).await?;
+ let endpoint = create_endpoint(&config).await?;
let channel = connect(endpoint).await;
let mut reporter = GrpcReporter::new_with_pc(channel, producer, consumer);
- if !AUTHENTICATION.is_empty() {
- reporter = reporter.with_authentication(&*AUTHENTICATION);
+ if !config.authentication.is_empty() {
+ reporter = reporter.with_authentication(config.authentication);
}
info!("Worker is ready...");
@@ -52,40 +59,40 @@ pub async fn run_reporter(
Ok(())
}
-async fn create_endpoint(server_addr: &str) -> anyhow::Result<Endpoint> {
- let scheme = if *ENABLE_TLS { "https" } else { "http" };
+async fn create_endpoint(config: &GrpcReporterConfiguration) ->
anyhow::Result<Endpoint> {
+ let scheme = if config.enable_tls { "https" } else { "http" };
- let url = format!("{}://{}", scheme, server_addr);
+ let url = format!("{}://{}", scheme, config.server_addr);
debug!(url, "Create Endpoint");
let mut endpoint = Endpoint::from_shared(url)?;
debug!(
- enable_tls = *ENABLE_TLS,
- ssl_trusted_ca_path = &*SSL_TRUSTED_CA_PATH,
- ssl_key_path = &*SSL_KEY_PATH,
- ssl_cert_chain_path = &*SSL_CERT_CHAIN_PATH,
+ enable_tls = config.enable_tls,
+ ssl_trusted_ca_path = config.ssl_trusted_ca_path,
+ ssl_key_path = config.ssl_key_path,
+ ssl_cert_chain_path = config.ssl_cert_chain_path,
"Skywalking TLS info"
);
- if *ENABLE_TLS {
- let domain_name = server_addr.split(':').next().unwrap_or_default();
+ if config.enable_tls {
+ let domain_name =
config.server_addr.split(':').next().unwrap_or_default();
debug!(domain_name, "Configure TLS domain");
let mut tls = ClientTlsConfig::new().domain_name(domain_name);
- let ssl_trusted_ca_path = SSL_TRUSTED_CA_PATH.as_str();
+ let ssl_trusted_ca_path = &config.ssl_trusted_ca_path;
if !ssl_trusted_ca_path.is_empty() {
debug!(ssl_trusted_ca_path, "Configure TLS CA");
- let ca_cert = tokio::fs::read(&*SSL_TRUSTED_CA_PATH).await?;
+ let ca_cert = tokio::fs::read(&config.ssl_trusted_ca_path).await?;
let ca_cert = Certificate::from_pem(ca_cert);
tls = tls.ca_certificate(ca_cert);
}
- let ssl_key_path = SSL_KEY_PATH.as_str();
- let ssl_cert_chain_path = SSL_CERT_CHAIN_PATH.as_str();
+ let ssl_key_path = &config.ssl_key_path;
+ let ssl_cert_chain_path = &config.ssl_cert_chain_path;
if !ssl_key_path.is_empty() && !ssl_cert_chain_path.is_empty() {
debug!(ssl_trusted_ca_path, "Configure mTLS");
- let client_cert = tokio::fs::read(&*SSL_CERT_CHAIN_PATH).await?;
- let client_key = tokio::fs::read(&*SSL_KEY_PATH).await?;
+ let client_cert =
tokio::fs::read(&config.ssl_cert_chain_path).await?;
+ let client_key = tokio::fs::read(&config.ssl_key_path).await?;
let client_identity = Identity::from_pem(client_cert, client_key);
tls = tls.identity(client_identity);
}
diff --git a/src/reporter/reporter_kafka.rs
b/worker/src/reporter/reporter_kafka.rs
similarity index 82%
rename from src/reporter/reporter_kafka.rs
rename to worker/src/reporter/reporter_kafka.rs
index 61e4cf6..204564c 100644
--- a/src/reporter/reporter_kafka.rs
+++ b/worker/src/reporter/reporter_kafka.rs
@@ -15,7 +15,6 @@
#![cfg(feature = "kafka-reporter")]
-use crate::module::{KAFKA_BOOTSTRAP_SERVERS, KAFKA_PRODUCER_CONFIG};
use anyhow::{bail, Context};
use skywalking::reporter::{
kafka::{KafkaReportBuilder, RDKafkaClientConfig},
@@ -23,14 +22,20 @@ use skywalking::reporter::{
};
use std::collections::HashMap;
+pub struct KafkaReporterConfiguration {
+ pub kafka_bootstrap_servers: String,
+ pub kafka_producer_config: String,
+}
+
pub async fn run_reporter(
- producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+ config: KafkaReporterConfiguration, producer: impl CollectItemProduce,
+ consumer: impl CollectItemConsume,
) -> anyhow::Result<()> {
let mut client_config = RDKafkaClientConfig::new();
- client_config.set("bootstrap.servers", &*KAFKA_BOOTSTRAP_SERVERS);
+ client_config.set("bootstrap.servers", config.kafka_bootstrap_servers);
- let config = serde_json::from_str::<HashMap<String,
String>>(&KAFKA_PRODUCER_CONFIG)
+ let config = serde_json::from_str::<HashMap<String,
String>>(&config.kafka_producer_config)
.context("parse kafka producer config failed")?;
for (key, value) in config {
client_config.set(key, value);