This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-php.git


The following commit(s) were added to refs/heads/master by this push:
     new f433815  Refactor worker to standalone crate (#118)
f433815 is described below

commit f43381509bf84674dc4b9b3ebde4404cb3f9e8ec
Author: jmjoy <[email protected]>
AuthorDate: Thu Aug 15 15:30:54 2024 +0800

    Refactor worker to standalone crate (#118)
---
 .licenserc.yaml                                |   1 +
 Cargo.lock                                     |  47 ++++--
 Cargo.toml                                     |  19 ++-
 config.m4                                      |   1 +
 dist-material/LICENSE                          |  11 +-
 scripts/Cargo.toml                             |  10 +-
 src/channel.rs                                 |  24 ---
 src/lib.rs                                     |   1 -
 src/util.rs                                    |  12 --
 src/worker.rs                                  | 222 ++++++-------------------
 Cargo.toml => worker/Cargo.toml                |  48 ++----
 {src => worker/src}/channel.rs                 |  56 +------
 src/worker.rs => worker/src/lib.rs             | 126 ++++++--------
 {src => worker/src}/reporter/mod.rs            |  26 ++-
 {src => worker/src}/reporter/reporter_grpc.rs  |  51 +++---
 {src => worker/src}/reporter/reporter_kafka.rs |  13 +-
 16 files changed, 230 insertions(+), 438 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 222a72e..9d5fdd4 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -32,6 +32,7 @@ header:
     - 'config.m4'
     - 'vendor'
     - 'dist-material'
+    - 'target'
 
   comment: on-failure
 
diff --git a/Cargo.lock b/Cargo.lock
index 5e4ce47..8a6988f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -82,9 +82,9 @@ dependencies = [
 
 [[package]]
 name = "anstyle-wincon"
-version = "1.0.1"
+version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188"
+checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c"
 dependencies = [
  "anstyle",
  "windows-sys 0.48.0",
@@ -365,9 +365,9 @@ dependencies = [
 
 [[package]]
 name = "clap"
-version = "4.3.19"
+version = "4.3.24"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d"
+checksum = "fb690e81c7840c0d7aade59f242ea3b41b9bc27bcd5997890e7702ae4b32e487"
 dependencies = [
  "clap_builder",
  "clap_derive",
@@ -376,9 +376,9 @@ dependencies = [
 
 [[package]]
 name = "clap_builder"
-version = "4.3.19"
+version = "4.3.24"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1"
+checksum = "5ed2e96bc16d8d740f6f48d663eddf4b8a0983e79210fd55479b7bcd0a69860e"
 dependencies = [
  "anstream",
  "anstyle",
@@ -983,13 +983,13 @@ checksum = 
"28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6"
 
 [[package]]
 name = "is-terminal"
-version = "0.4.9"
+version = "0.4.12"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
+checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b"
 dependencies = [
  "hermit-abi",
- "rustix",
- "windows-sys 0.48.0",
+ "libc",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
@@ -1597,9 +1597,9 @@ dependencies = [
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.66"
+version = "1.0.86"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
+checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
 dependencies = [
  "unicode-ident",
 ]
@@ -2173,8 +2173,8 @@ dependencies = [
  "prost",
  "rdkafka",
  "reqwest",
- "serde_json",
  "skywalking",
+ "skywalking-php-worker",
  "systemstat",
  "thiserror",
  "tokio",
@@ -2185,6 +2185,27 @@ dependencies = [
  "url",
 ]
 
+[[package]]
+name = "skywalking-php-worker"
+version = "0.8.0-dev"
+dependencies = [
+ "anyhow",
+ "bincode",
+ "clap",
+ "clap_lex",
+ "libc",
+ "once_cell",
+ "prost",
+ "rdkafka",
+ "serde_json",
+ "skywalking",
+ "tokio",
+ "tokio-stream",
+ "tonic",
+ "tracing",
+ "tracing-subscriber",
+]
+
 [[package]]
 name = "slab"
 version = "0.4.8"
diff --git a/Cargo.toml b/Cargo.toml
index 3af549e..a7cbf86 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,17 +17,26 @@
 members = [
     ".",
     "scripts",
+    "worker",
 ]
 
-[package]
-name = "skywalking-php"
+[workspace.package]
 version = "0.8.0-dev"
 authors = ["Apache Software Foundation", "jmjoy <[email protected]>", "Yanlong 
He <[email protected]>"]
-description = "Apache SkyWalking PHP Agent."
 edition = "2021"
 rust-version = "1.65"
 repository = "https://github.com/apache/skywalking-php";
 license = "Apache-2.0"
+
+[package]
+name = "skywalking-php"
+version = { workspace = true }
+authors = { workspace = true }
+description = "Apache SkyWalking PHP Agent."
+edition = { workspace = true }
+rust-version = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
 readme = "README.md"
 publish = false
 
@@ -36,7 +45,7 @@ name = "skywalking_agent"
 crate-type = ["lib", "cdylib"]
 
 [features]
-kafka-reporter = ["skywalking/kafka-reporter", "rdkafka/sasl"]
+kafka-reporter = ["skywalking-php-worker/kafka-reporter"]
 
 [dependencies]
 anyhow = { version = "1.0.72", features = ["backtrace"] }
@@ -49,8 +58,8 @@ once_cell = "1.18.0"
 phper = "0.12.0"
 prost = "0.11.9"
 rdkafka = { version = "0.32.2", optional = true }
-serde_json = { version = "1.0.104", features = ["preserve_order"] }
 skywalking = { version = "0.8.0", features = ["management"] }
+skywalking-php-worker = { path = "worker" }
 systemstat = "0.2.3"
 thiserror = "1.0.44"
 tokio = { version = "1.29.1", features = ["full"] }
diff --git a/config.m4 b/config.m4
index edd4c0e..e60d10f 100644
--- a/config.m4
+++ b/config.m4
@@ -85,5 +85,6 @@ EOF
     scripts:scripts \
     src:src \
     tests:tests \
+    worker:worker \
     ])
 fi
diff --git a/dist-material/LICENSE b/dist-material/LICENSE
index 4a27d97..c2f79a1 100644
--- a/dist-material/LICENSE
+++ b/dist-material/LICENSE
@@ -224,6 +224,7 @@ The text of each license is the standard Apache 2.0 license.
     https://crates.io/crates/sasl2-sys/0.1.22+2.1.28 0.1.22+2.1.28 Apache-2.0
     https://crates.io/crates/scripts/0.0.0 0.0.0 Apache-2.0
     https://crates.io/crates/skywalking/0.8.0 0.8.0 Apache-2.0
+    https://crates.io/crates/skywalking-php-worker/0.8.0-dev 0.8.0-dev 
Apache-2.0
     https://crates.io/crates/sync_wrapper/0.1.2 0.1.2 Apache-2.0
 
 ========================================================================
@@ -277,7 +278,7 @@ The text of each license is also included in 
licenses/LICENSE-[project].txt.
     https://crates.io/crates/anstyle/1.0.1 1.0.1 Apache-2.0 OR MIT
     https://crates.io/crates/anstyle-parse/0.2.1 0.2.1 Apache-2.0 OR MIT
     https://crates.io/crates/anstyle-query/1.0.0 1.0.0 Apache-2.0 OR MIT
-    https://crates.io/crates/anstyle-wincon/1.0.1 1.0.1 Apache-2.0 OR MIT
+    https://crates.io/crates/anstyle-wincon/1.0.2 1.0.2 Apache-2.0 OR MIT
     https://crates.io/crates/anyhow/1.0.72 1.0.72 Apache-2.0 OR MIT
     https://crates.io/crates/async-trait/0.1.72 0.1.72 Apache-2.0 OR MIT
     https://crates.io/crates/autocfg/1.1.0 1.1.0 Apache-2.0 OR MIT
@@ -295,8 +296,8 @@ The text of each license is also included in 
licenses/LICENSE-[project].txt.
     https://crates.io/crates/chrono/0.4.26 0.4.26 Apache-2.0 OR MIT
     https://crates.io/crates/chrono-tz/0.6.1 0.6.1 Apache-2.0 OR MIT
     https://crates.io/crates/chrono-tz-build/0.0.2 0.0.2 Apache-2.0 OR MIT
-    https://crates.io/crates/clap/4.3.19 4.3.19 Apache-2.0 OR MIT
-    https://crates.io/crates/clap_builder/4.3.19 4.3.19 Apache-2.0 OR MIT
+    https://crates.io/crates/clap/4.3.24 4.3.24 Apache-2.0 OR MIT
+    https://crates.io/crates/clap_builder/4.3.24 4.3.24 Apache-2.0 OR MIT
     https://crates.io/crates/clap_derive/4.3.12 4.3.12 Apache-2.0 OR MIT
     https://crates.io/crates/clap_lex/0.5.0 0.5.0 Apache-2.0 OR MIT
     https://crates.io/crates/colorchoice/1.0.0 1.0.0 Apache-2.0 OR MIT
@@ -385,7 +386,7 @@ The text of each license is also included in 
licenses/LICENSE-[project].txt.
     https://crates.io/crates/ppv-lite86/0.2.17 0.2.17 Apache-2.0 OR MIT
     https://crates.io/crates/prettyplease/0.1.25 0.1.25 Apache-2.0 OR MIT
     https://crates.io/crates/proc-macro-crate/1.3.1 1.3.1 Apache-2.0 OR MIT
-    https://crates.io/crates/proc-macro2/1.0.66 1.0.66 Apache-2.0 OR MIT
+    https://crates.io/crates/proc-macro2/1.0.86 1.0.86 Apache-2.0 OR MIT
     https://crates.io/crates/quick-error/1.2.3 1.2.3 Apache-2.0 OR MIT
     https://crates.io/crates/quote/1.0.32 1.0.32 Apache-2.0 OR MIT
     https://crates.io/crates/rand/0.8.5 0.8.5 Apache-2.0 OR MIT
@@ -542,7 +543,7 @@ The text of each license is also included in 
licenses/LICENSE-[project].txt.
     https://crates.io/crates/hostname/0.3.1 0.3.1 MIT
     https://crates.io/crates/http-body/0.4.5 0.4.5 MIT
     https://crates.io/crates/hyper/0.14.27 0.14.27 MIT
-    https://crates.io/crates/is-terminal/0.4.9 0.4.9 MIT
+    https://crates.io/crates/is-terminal/0.4.12 0.4.12 MIT
     https://crates.io/crates/matchers/0.1.0 0.1.0 MIT
     https://crates.io/crates/matches/0.1.10 0.1.10 MIT
     https://crates.io/crates/mio/0.8.8 0.8.8 MIT
diff --git a/scripts/Cargo.toml b/scripts/Cargo.toml
index 8e0aeb7..30dec9a 100644
--- a/scripts/Cargo.toml
+++ b/scripts/Cargo.toml
@@ -16,12 +16,12 @@
 [package]
 name = "scripts"
 version = "0.0.0"
-authors = ["Apache Software Foundation", "jmjoy <[email protected]>", "Yanlong 
He <[email protected]>"]
+authors = { workspace = true }
 description = "The Scripts of Apache SkyWalking PHP Agent."
-edition = "2021"
-rust-version = "1.58"
-repository = "https://github.com/apache/skywalking-php";
-license = "Apache-2.0"
+edition = { workspace = true }
+rust-version = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
 publish = false
 
 [dependencies]
diff --git a/src/channel.rs b/src/channel.rs
index e29617b..8976f86 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -18,13 +18,11 @@ use once_cell::sync::OnceCell;
 use skywalking::reporter::{CollectItem, Report};
 use std::{
     io::Write,
-    mem::size_of,
     ops::DerefMut,
     os::unix::net::UnixStream,
     path::{Path, PathBuf},
     sync::Mutex,
 };
-use tokio::{io::AsyncReadExt, sync::mpsc};
 use tracing::error;
 
 fn channel_send<T>(data: CollectItem, mut sender: T) -> anyhow::Result<()>
@@ -40,18 +38,6 @@ where
     Ok(())
 }
 
-pub async fn channel_receive(receiver: &mut tokio::net::UnixStream) -> 
anyhow::Result<CollectItem> {
-    let mut size_buf = [0u8; size_of::<usize>()];
-    receiver.read_exact(&mut size_buf).await?;
-    let size = usize::from_le_bytes(size_buf);
-
-    let mut content = vec![0u8; size];
-    receiver.read_exact(&mut content).await?;
-
-    let item = bincode::deserialize(&content)?;
-    Ok(item)
-}
-
 pub struct Reporter {
     worker_addr: PathBuf,
     stream: OnceCell<Mutex<UnixStream>>,
@@ -83,13 +69,3 @@ impl Report for Reporter {
         }
     }
 }
-
-pub struct TxReporter(pub mpsc::Sender<CollectItem>);
-
-impl Report for TxReporter {
-    fn report(&self, item: CollectItem) {
-        if let Err(err) = self.0.try_send(item) {
-            error!(?err, "Send collect item failed");
-        }
-    }
-}
diff --git a/src/lib.rs b/src/lib.rs
index a65f4ef..5a54bb9 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -24,7 +24,6 @@ mod errors;
 mod execute;
 mod module;
 mod plugin;
-mod reporter;
 mod request;
 mod tag;
 mod util;
diff --git a/src/util.rs b/src/util.rs
index e9fb1ca..6ad8bd8 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -18,9 +18,7 @@ use once_cell::sync::Lazy;
 use phper::{ini::ini_get, sys, values::ZVal};
 use std::{
     ffi::CStr,
-    os::unix::prelude::OsStrExt,
     panic::{catch_unwind, UnwindSafe},
-    path::Path,
 };
 use systemstat::{IpAddr, Platform, System};
 
@@ -87,16 +85,6 @@ pub fn get_sapi_module_name() -> &'static CStr {
     unsafe { CStr::from_ptr(sys::sapi_module.name) }
 }
 
-pub fn change_permission(f: impl AsRef<Path>, mode: libc::mode_t) {
-    let f = f.as_ref().as_os_str().as_bytes();
-    let mut path = Vec::with_capacity(f.len() + 1);
-    path.extend_from_slice(f);
-    path.push(b'\0');
-    unsafe {
-        libc::chmod(path.as_ptr().cast(), mode);
-    }
-}
-
 pub fn get_str_ini_with_default(name: &str) -> String {
     ini_get::<Option<&CStr>>(name)
         .and_then(|s| s.to_str().ok())
diff --git a/src/worker.rs b/src/worker.rs
index 80c74a7..dbeaa60 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -13,39 +13,24 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use crate::{
-    channel::{self, TxReporter},
-    module::{
-        HEARTBEAT_PERIOD, PROPERTIES_REPORT_PERIOD_FACTOR, SERVICE_INSTANCE, 
SERVICE_NAME,
-        SOCKET_FILE_PATH, WORKER_THREADS,
-    },
-    reporter::run_reporter,
-    util::change_permission,
+use crate::module::{
+    AUTHENTICATION, ENABLE_TLS, HEARTBEAT_PERIOD, 
PROPERTIES_REPORT_PERIOD_FACTOR, REPORTER_TYPE,
+    SERVER_ADDR, SERVICE_INSTANCE, SERVICE_NAME, SOCKET_FILE_PATH, 
SSL_CERT_CHAIN_PATH,
+    SSL_KEY_PATH, SSL_TRUSTED_CA_PATH, WORKER_THREADS,
 };
-
-use once_cell::sync::Lazy;
-
-use skywalking::{
-    management::{instance::Properties, manager::Manager},
-    reporter::{CollectItem, CollectItemConsume},
-};
-use std::{
-    cmp::Ordering, error::Error, fs, io, marker::PhantomData, 
num::NonZeroUsize, process::exit,
-    thread::available_parallelism, time::Duration,
-};
-use tokio::{
-    net::UnixListener,
-    runtime::{self, Runtime},
-    select,
-    signal::unix::{signal, SignalKind},
-    sync::mpsc::{self, error::TrySendError},
+#[cfg(feature = "kafka-reporter")]
+use crate::module::{KAFKA_BOOTSTRAP_SERVERS, KAFKA_PRODUCER_CONFIG};
+#[cfg(feature = "kafka-reporter")]
+use skywalking_php_worker::reporter::KafkaReporterConfiguration;
+use skywalking_php_worker::{
+    new_tokio_runtime,
+    reporter::{GrpcReporterConfiguration, ReporterConfiguration},
+    start_worker, HeartBeatConfiguration, WorkerConfiguration,
 };
-use tonic::async_trait;
-use tracing::{debug, error, info, warn};
+use std::{cmp::Ordering, num::NonZeroUsize, process::exit, 
thread::available_parallelism};
+use tracing::error;
 
 pub fn init_worker() {
-    let worker_threads = worker_threads();
-
     unsafe {
         // TODO Shutdown previous worker before fork if there is a PHP-FPM 
reload
         // operation.
@@ -61,9 +46,40 @@ pub fn init_worker() {
                 #[cfg(target_os = "linux")]
                 libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);
 
+                let reporter_config = match REPORTER_TYPE.as_str() {
+                    "grpc" => 
ReporterConfiguration::Grpc(GrpcReporterConfiguration {
+                        authentication: AUTHENTICATION.clone(),
+                        enable_tls: *ENABLE_TLS,
+                        server_addr: SERVER_ADDR.clone(),
+                        ssl_cert_chain_path: SSL_CERT_CHAIN_PATH.clone(),
+                        ssl_key_path: SSL_KEY_PATH.clone(),
+                        ssl_trusted_ca_path: SSL_TRUSTED_CA_PATH.clone(),
+                    }),
+                    #[cfg(feature = "kafka-reporter")]
+                    "kafka" => 
ReporterConfiguration::Kafka(KafkaReporterConfiguration {
+                        kafka_bootstrap_servers: 
KAFKA_BOOTSTRAP_SERVERS.clone(),
+                        kafka_producer_config: KAFKA_PRODUCER_CONFIG.clone(),
+                    }),
+                    typ => {
+                        error!("unknown reporter type, {}", typ);
+                        exit(1);
+                    }
+                };
+
+                let config = WorkerConfiguration {
+                    socket_file_path: SOCKET_FILE_PATH.to_path_buf(),
+                    heart_beat: Some(HeartBeatConfiguration {
+                        service_instance: SERVICE_INSTANCE.clone(),
+                        service_name: SERVICE_NAME.clone(),
+                        heartbeat_period: *HEARTBEAT_PERIOD,
+                        properties_report_period_factor: 
*PROPERTIES_REPORT_PERIOD_FACTOR,
+                    }),
+                    reporter_config,
+                };
+
                 // Run the worker in subprocess.
-                let rt = new_tokio_runtime(worker_threads);
-                match rt.block_on(start_worker()) {
+                let rt = new_tokio_runtime(worker_threads());
+                match rt.block_on(start_worker(config)) {
                     Ok(_) => {
                         exit(0);
                     }
@@ -86,147 +102,3 @@ fn worker_threads() -> usize {
         worker_threads as usize
     }
 }
-
-fn new_tokio_runtime(worker_threads: usize) -> Runtime {
-    runtime::Builder::new_multi_thread()
-        .thread_name("sw: worker")
-        .enable_all()
-        .worker_threads(worker_threads)
-        .build()
-        .unwrap()
-}
-
-async fn start_worker() -> anyhow::Result<()> {
-    debug!("Starting worker...");
-
-    // Ensure to cleanup resources when worker exits.
-    let _guard = WorkerExitGuard::default();
-
-    // Graceful shutdown signal, put it on the top of program.
-    let mut sig_term = signal(SignalKind::terminate())?;
-    let mut sig_int = signal(SignalKind::interrupt())?;
-
-    let socket_file = &*SOCKET_FILE_PATH;
-
-    let fut = async move {
-        debug!(?socket_file, "Bind unix stream");
-        let listener = UnixListener::bind(socket_file)?;
-        change_permission(socket_file, 0o777);
-
-        let (tx, rx) = mpsc::channel::<CollectItem>(255);
-        let tx_ = tx.clone();
-        tokio::spawn(async move {
-            loop {
-                match listener.accept().await {
-                    Ok((mut stream, _addr)) => {
-                        let tx = tx.clone();
-
-                        tokio::spawn(async move {
-                            debug!("Entering channel_receive loop");
-
-                            loop {
-                                let r = match channel::channel_receive(&mut 
stream).await {
-                                    Err(err) => match 
err.downcast_ref::<io::Error>() {
-                                        Some(e) if e.kind() == 
io::ErrorKind::UnexpectedEof => {
-                                            debug!("Leaving channel_receive 
loop");
-                                            return;
-                                        }
-                                        _ => {
-                                            error!(?err, "channel_receive 
failed");
-                                            continue;
-                                        }
-                                    },
-                                    Ok(i) => i,
-                                };
-
-                                // Try send here, to prevent the ipc blocking 
caused by the channel
-                                // bursting (too late to report),
-                                // which affects the pool process of php-fpm.
-                                if let Err(err) = tx.try_send(r) {
-                                    error!(?err, "Send collect item failed");
-                                    if !matches!(err, TrySendError::Full(_)) {
-                                        return;
-                                    }
-                                }
-                            }
-                        });
-                    }
-                    Err(err) => {
-                        error!(?err, "Accept failed");
-                    }
-                }
-            }
-        });
-
-        report_properties_and_keep_alive(TxReporter(tx_));
-
-        // Run reporter with blocking.
-        run_reporter((), Consumer(rx)).await?;
-
-        Ok::<_, anyhow::Error>(())
-    };
-
-    // TODO Do graceful shutdown, and wait 10s then force quit.
-    select! {
-        _ = sig_term.recv() => {}
-        _ = sig_int.recv() => {}
-        r = fut => {
-            r?;
-        }
-    }
-
-    info!("Start to shutdown skywalking grpc reporter");
-
-    Ok(())
-}
-
-struct Consumer(mpsc::Receiver<CollectItem>);
-
-#[async_trait]
-impl CollectItemConsume for Consumer {
-    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + 
Send>> {
-        Ok(self.0.recv().await)
-    }
-
-    async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn 
Error + Send>> {
-        Ok(self.0.try_recv().ok())
-    }
-}
-
-#[derive(Default)]
-struct WorkerExitGuard(PhantomData<()>);
-
-impl Drop for WorkerExitGuard {
-    fn drop(&mut self) {
-        match Lazy::get(&SOCKET_FILE_PATH) {
-            Some(socket_file) => {
-                info!(?socket_file, "Remove socket file");
-                if let Err(err) = fs::remove_file(socket_file) {
-                    error!(?err, "Remove socket file failed");
-                }
-            }
-            None => {
-                warn!("Socket file not created");
-            }
-        }
-    }
-}
-
-fn report_properties_and_keep_alive(reporter: TxReporter) {
-    let manager = Manager::new(&*SERVICE_NAME, &*SERVICE_INSTANCE, reporter);
-
-    manager.report_and_keep_alive(
-        || {
-            let mut props = Properties::new();
-            props.insert_os_info();
-            props.update(Properties::KEY_LANGUAGE, "php");
-            props.update(Properties::KEY_PROCESS_NO, unsafe {
-                libc::getppid().to_string()
-            });
-            debug!(?props, "Report instance properties");
-            props
-        },
-        Duration::from_secs(*HEARTBEAT_PERIOD as u64),
-        *PROPERTIES_REPORT_PERIOD_FACTOR as usize,
-    );
-}
diff --git a/Cargo.toml b/worker/Cargo.toml
similarity index 66%
copy from Cargo.toml
copy to worker/Cargo.toml
index 3af549e..2780dc9 100644
--- a/Cargo.toml
+++ b/worker/Cargo.toml
@@ -13,57 +13,39 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-[workspace]
-members = [
-    ".",
-    "scripts",
-]
-
 [package]
-name = "skywalking-php"
-version = "0.8.0-dev"
-authors = ["Apache Software Foundation", "jmjoy <[email protected]>", "Yanlong 
He <[email protected]>"]
-description = "Apache SkyWalking PHP Agent."
-edition = "2021"
-rust-version = "1.65"
-repository = "https://github.com/apache/skywalking-php";
-license = "Apache-2.0"
+name = "skywalking-php-worker"
+version = { workspace = true }
+authors = { workspace = true }
+description = "Apache SkyWalking PHP Worker."
+edition = { workspace = true }
+rust-version = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
 readme = "README.md"
 publish = false
 
-[lib]
-name = "skywalking_agent"
-crate-type = ["lib", "cdylib"]
-
 [features]
+standalone = ["clap", "clap_lex", "tracing-subscriber"]
 kafka-reporter = ["skywalking/kafka-reporter", "rdkafka/sasl"]
 
 [dependencies]
 anyhow = { version = "1.0.72", features = ["backtrace"] }
 bincode = "1.3.3"
-dashmap = "5.5.0"
-futures-util = "0.3.28"
-hostname = "0.3.1"
+clap = { version = "=4.3.24", features = ["derive"], optional = true }
+clap_lex = { version = "=0.5.0", optional = true }
 libc = "0.2.147"
 once_cell = "1.18.0"
-phper = "0.12.0"
 prost = "0.11.9"
 rdkafka = { version = "0.32.2", optional = true }
 serde_json = { version = "1.0.104", features = ["preserve_order"] }
 skywalking = { version = "0.8.0", features = ["management"] }
-systemstat = "0.2.3"
-thiserror = "1.0.44"
 tokio = { version = "1.29.1", features = ["full"] }
 tokio-stream = "0.1.14"
 tonic = { version = "0.8.3", features = ["tls", "tls-roots"] }
 tracing = { version = "0.1.37", features = ["attributes"] }
-tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
-url = "2.4.0"
-
-[dev-dependencies]
-axum = "0.6.19"
-fastcgi-client = "0.9.0"
-reqwest = { version = "0.11.18", features = ["trust-dns", "json", "stream"] }
+tracing-subscriber = { version = "0.3.17", features = ["env-filter"], optional 
= true }
 
-[build-dependencies]
-phper-build = "0.12.0"
+# [[bin]]
+# name = "skywalking-php-worker"
+# required-features = ["standalone", "kafka-reporter"]
diff --git a/src/channel.rs b/worker/src/channel.rs
similarity index 54%
copy from src/channel.rs
copy to worker/src/channel.rs
index e29617b..6dcdd2a 100644
--- a/src/channel.rs
+++ b/worker/src/channel.rs
@@ -13,33 +13,11 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use anyhow::anyhow;
-use once_cell::sync::OnceCell;
 use skywalking::reporter::{CollectItem, Report};
-use std::{
-    io::Write,
-    mem::size_of,
-    ops::DerefMut,
-    os::unix::net::UnixStream,
-    path::{Path, PathBuf},
-    sync::Mutex,
-};
+use std::mem::size_of;
 use tokio::{io::AsyncReadExt, sync::mpsc};
 use tracing::error;
 
-fn channel_send<T>(data: CollectItem, mut sender: T) -> anyhow::Result<()>
-where
-    T: DerefMut<Target = UnixStream>,
-{
-    let content = bincode::serialize(&data)?;
-
-    sender.write_all(&content.len().to_le_bytes())?;
-    sender.write_all(&content)?;
-    sender.flush()?;
-
-    Ok(())
-}
-
 pub async fn channel_receive(receiver: &mut tokio::net::UnixStream) -> 
anyhow::Result<CollectItem> {
     let mut size_buf = [0u8; size_of::<usize>()];
     receiver.read_exact(&mut size_buf).await?;
@@ -52,38 +30,6 @@ pub async fn channel_receive(receiver: &mut 
tokio::net::UnixStream) -> anyhow::R
     Ok(item)
 }
 
-pub struct Reporter {
-    worker_addr: PathBuf,
-    stream: OnceCell<Mutex<UnixStream>>,
-}
-
-impl Reporter {
-    pub fn new(worker_addr: impl AsRef<Path>) -> Self {
-        Self {
-            worker_addr: worker_addr.as_ref().to_path_buf(),
-            stream: OnceCell::new(),
-        }
-    }
-
-    fn try_report(&self, item: CollectItem) -> anyhow::Result<()> {
-        let stream = self
-            .stream
-            .get_or_try_init(|| 
UnixStream::connect(&self.worker_addr).map(Mutex::new))?
-            .lock()
-            .map_err(|_| anyhow!("Get Lock failed"))?;
-
-        channel_send(item, stream)
-    }
-}
-
-impl Report for Reporter {
-    fn report(&self, item: CollectItem) {
-        if let Err(err) = self.try_report(item) {
-            error!(?err, "channel send failed");
-        }
-    }
-}
-
 pub struct TxReporter(pub mpsc::Sender<CollectItem>);
 
 impl Report for TxReporter {
diff --git a/src/worker.rs b/worker/src/lib.rs
similarity index 64%
copy from src/worker.rs
copy to worker/src/lib.rs
index 80c74a7..0348456 100644
--- a/src/worker.rs
+++ b/worker/src/lib.rs
@@ -13,25 +13,23 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+pub mod channel;
+pub mod reporter;
+
 use crate::{
-    channel::{self, TxReporter},
-    module::{
-        HEARTBEAT_PERIOD, PROPERTIES_REPORT_PERIOD_FACTOR, SERVICE_INSTANCE, 
SERVICE_NAME,
-        SOCKET_FILE_PATH, WORKER_THREADS,
-    },
-    reporter::run_reporter,
-    util::change_permission,
+    channel::TxReporter,
+    reporter::{run_reporter, ReporterConfiguration},
 };
-
-use once_cell::sync::Lazy;
-
 use skywalking::{
     management::{instance::Properties, manager::Manager},
     reporter::{CollectItem, CollectItemConsume},
 };
 use std::{
-    cmp::Ordering, error::Error, fs, io, marker::PhantomData, 
num::NonZeroUsize, process::exit,
-    thread::available_parallelism, time::Duration,
+    error::Error,
+    fs, io,
+    os::unix::prelude::OsStrExt,
+    path::{Path, PathBuf},
+    time::Duration,
 };
 use tokio::{
     net::UnixListener,
@@ -41,53 +39,22 @@ use tokio::{
     sync::mpsc::{self, error::TrySendError},
 };
 use tonic::async_trait;
-use tracing::{debug, error, info, warn};
+use tracing::{debug, error, info};
 
-pub fn init_worker() {
-    let worker_threads = worker_threads();
-
-    unsafe {
-        // TODO Shutdown previous worker before fork if there is a PHP-FPM 
reload
-        // operation.
-        // TODO Change the worker process name.
-
-        let pid = libc::fork();
-        match pid.cmp(&0) {
-            Ordering::Less => {
-                error!("fork failed");
-            }
-            Ordering::Equal => {
-                // Ensure worker process exits when master process exists.
-                #[cfg(target_os = "linux")]
-                libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);
-
-                // Run the worker in subprocess.
-                let rt = new_tokio_runtime(worker_threads);
-                match rt.block_on(start_worker()) {
-                    Ok(_) => {
-                        exit(0);
-                    }
-                    Err(err) => {
-                        error!(?err, "worker exit unexpectedly");
-                        exit(1);
-                    }
-                }
-            }
-            Ordering::Greater => {}
-        }
-    }
+pub struct WorkerConfiguration {
+    pub socket_file_path: PathBuf,
+    pub heart_beat: Option<HeartBeatConfiguration>,
+    pub reporter_config: ReporterConfiguration,
 }
 
-fn worker_threads() -> usize {
-    let worker_threads = *WORKER_THREADS;
-    if worker_threads <= 0 {
-        available_parallelism().map(NonZeroUsize::get).unwrap_or(1)
-    } else {
-        worker_threads as usize
-    }
+pub struct HeartBeatConfiguration {
+    pub service_instance: String,
+    pub service_name: String,
+    pub heartbeat_period: i64,
+    pub properties_report_period_factor: i64,
 }
 
-fn new_tokio_runtime(worker_threads: usize) -> Runtime {
+pub fn new_tokio_runtime(worker_threads: usize) -> Runtime {
     runtime::Builder::new_multi_thread()
         .thread_name("sw: worker")
         .enable_all()
@@ -96,21 +63,21 @@ fn new_tokio_runtime(worker_threads: usize) -> Runtime {
         .unwrap()
 }
 
-async fn start_worker() -> anyhow::Result<()> {
+pub async fn start_worker(config: WorkerConfiguration) -> anyhow::Result<()> {
     debug!("Starting worker...");
 
+    let socket_file = config.socket_file_path;
+
     // Ensure to cleanup resources when worker exits.
-    let _guard = WorkerExitGuard::default();
+    let _guard = WorkerExitGuard(socket_file.clone());
 
     // Graceful shutdown signal, put it on the top of program.
     let mut sig_term = signal(SignalKind::terminate())?;
     let mut sig_int = signal(SignalKind::interrupt())?;
 
-    let socket_file = &*SOCKET_FILE_PATH;
-
     let fut = async move {
         debug!(?socket_file, "Bind unix stream");
-        let listener = UnixListener::bind(socket_file)?;
+        let listener = UnixListener::bind(&socket_file)?;
         change_permission(socket_file, 0o777);
 
         let (tx, rx) = mpsc::channel::<CollectItem>(255);
@@ -158,10 +125,12 @@ async fn start_worker() -> anyhow::Result<()> {
             }
         });
 
-        report_properties_and_keep_alive(TxReporter(tx_));
+        if let Some(heart_beat_config) = config.heart_beat {
+            report_properties_and_keep_alive(heart_beat_config, 
TxReporter(tx_));
+        }
 
         // Run reporter with blocking.
-        run_reporter((), Consumer(rx)).await?;
+        run_reporter(config.reporter_config, (), Consumer(rx)).await?;
 
         Ok::<_, anyhow::Error>(())
     };
@@ -193,27 +162,20 @@ impl CollectItemConsume for Consumer {
     }
 }
 
-#[derive(Default)]
-struct WorkerExitGuard(PhantomData<()>);
+struct WorkerExitGuard(PathBuf);
 
 impl Drop for WorkerExitGuard {
     fn drop(&mut self) {
-        match Lazy::get(&SOCKET_FILE_PATH) {
-            Some(socket_file) => {
-                info!(?socket_file, "Remove socket file");
-                if let Err(err) = fs::remove_file(socket_file) {
-                    error!(?err, "Remove socket file failed");
-                }
-            }
-            None => {
-                warn!("Socket file not created");
-            }
+        let Self(ref socket_file) = self;
+        info!(?socket_file, "Remove socket file");
+        if let Err(err) = fs::remove_file(socket_file) {
+            error!(?err, "Remove socket file failed");
         }
     }
 }
 
-fn report_properties_and_keep_alive(reporter: TxReporter) {
-    let manager = Manager::new(&*SERVICE_NAME, &*SERVICE_INSTANCE, reporter);
+fn report_properties_and_keep_alive(config: HeartBeatConfiguration, reporter: 
TxReporter) {
+    let manager = Manager::new(&*config.service_name, 
&*config.service_instance, reporter);
 
     manager.report_and_keep_alive(
         || {
@@ -226,7 +188,17 @@ fn report_properties_and_keep_alive(reporter: TxReporter) {
             debug!(?props, "Report instance properties");
             props
         },
-        Duration::from_secs(*HEARTBEAT_PERIOD as u64),
-        *PROPERTIES_REPORT_PERIOD_FACTOR as usize,
+        Duration::from_secs(config.heartbeat_period as u64),
+        config.properties_report_period_factor as usize,
     );
 }
+
+fn change_permission(f: impl AsRef<Path>, mode: libc::mode_t) {
+    let f = f.as_ref().as_os_str().as_bytes();
+    let mut path = Vec::with_capacity(f.len() + 1);
+    path.extend_from_slice(f);
+    path.push(b'\0');
+    unsafe {
+        libc::chmod(path.as_ptr().cast(), mode);
+    }
+}
diff --git a/src/reporter/mod.rs b/worker/src/reporter/mod.rs
similarity index 59%
rename from src/reporter/mod.rs
rename to worker/src/reporter/mod.rs
index 7e50175..049c9fe 100644
--- a/src/reporter/mod.rs
+++ b/worker/src/reporter/mod.rs
@@ -16,17 +16,29 @@
 mod reporter_grpc;
 mod reporter_kafka;
 
-use crate::module::REPORTER_TYPE;
-use anyhow::bail;
+pub use reporter_grpc::GrpcReporterConfiguration;
+#[cfg(feature = "kafka-reporter")]
+pub use reporter_kafka::KafkaReporterConfiguration;
 use skywalking::reporter::{CollectItemConsume, CollectItemProduce};
 
+pub enum ReporterConfiguration {
+    Grpc(GrpcReporterConfiguration),
+
+    #[cfg(feature = "kafka-reporter")]
+    Kafka(KafkaReporterConfiguration),
+}
+
 pub async fn run_reporter(
-    producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+    config: ReporterConfiguration, producer: impl CollectItemProduce,
+    consumer: impl CollectItemConsume,
 ) -> anyhow::Result<()> {
-    match REPORTER_TYPE.as_str() {
-        "grpc" => reporter_grpc::run_reporter(producer, consumer).await,
+    match config {
+        ReporterConfiguration::Grpc(config) => {
+            reporter_grpc::run_reporter(config, producer, consumer).await
+        }
         #[cfg(feature = "kafka-reporter")]
-        "kafka" => reporter_kafka::run_reporter(producer, consumer).await,
-        typ => bail!("unknown reporter type, {}", typ),
+        ReporterConfiguration::Kafka(config) => {
+            reporter_kafka::run_reporter(config, producer, consumer).await
+        }
     }
 }
diff --git a/src/reporter/reporter_grpc.rs 
b/worker/src/reporter/reporter_grpc.rs
similarity index 66%
rename from src/reporter/reporter_grpc.rs
rename to worker/src/reporter/reporter_grpc.rs
index 7948f57..d450b43 100644
--- a/src/reporter/reporter_grpc.rs
+++ b/worker/src/reporter/reporter_grpc.rs
@@ -13,9 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use crate::module::{
-    AUTHENTICATION, ENABLE_TLS, SERVER_ADDR, SSL_CERT_CHAIN_PATH, 
SSL_KEY_PATH, SSL_TRUSTED_CA_PATH,
-};
 use anyhow::anyhow;
 use skywalking::reporter::{grpc::GrpcReporter, CollectItemConsume, 
CollectItemProduce};
 use std::time::Duration;
@@ -23,16 +20,26 @@ use tokio::time::sleep;
 use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, 
Identity};
 use tracing::{debug, info, warn};
 
+pub struct GrpcReporterConfiguration {
+    pub authentication: String,
+    pub enable_tls: bool,
+    pub server_addr: String,
+    pub ssl_cert_chain_path: String,
+    pub ssl_key_path: String,
+    pub ssl_trusted_ca_path: String,
+}
+
 pub async fn run_reporter(
-    producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+    config: GrpcReporterConfiguration, producer: impl CollectItemProduce,
+    consumer: impl CollectItemConsume,
 ) -> anyhow::Result<()> {
-    let endpoint = create_endpoint(&SERVER_ADDR).await?;
+    let endpoint = create_endpoint(&config).await?;
     let channel = connect(endpoint).await;
 
     let mut reporter = GrpcReporter::new_with_pc(channel, producer, consumer);
 
-    if !AUTHENTICATION.is_empty() {
-        reporter = reporter.with_authentication(&*AUTHENTICATION);
+    if !config.authentication.is_empty() {
+        reporter = reporter.with_authentication(config.authentication);
     }
 
     info!("Worker is ready...");
@@ -52,40 +59,40 @@ pub async fn run_reporter(
     Ok(())
 }
 
-async fn create_endpoint(server_addr: &str) -> anyhow::Result<Endpoint> {
-    let scheme = if *ENABLE_TLS { "https" } else { "http" };
+async fn create_endpoint(config: &GrpcReporterConfiguration) -> 
anyhow::Result<Endpoint> {
+    let scheme = if config.enable_tls { "https" } else { "http" };
 
-    let url = format!("{}://{}", scheme, server_addr);
+    let url = format!("{}://{}", scheme, config.server_addr);
     debug!(url, "Create Endpoint");
     let mut endpoint = Endpoint::from_shared(url)?;
 
     debug!(
-        enable_tls = *ENABLE_TLS,
-        ssl_trusted_ca_path = &*SSL_TRUSTED_CA_PATH,
-        ssl_key_path = &*SSL_KEY_PATH,
-        ssl_cert_chain_path = &*SSL_CERT_CHAIN_PATH,
+        enable_tls = config.enable_tls,
+        ssl_trusted_ca_path = config.ssl_trusted_ca_path,
+        ssl_key_path = config.ssl_key_path,
+        ssl_cert_chain_path = config.ssl_cert_chain_path,
         "Skywalking TLS info"
     );
 
-    if *ENABLE_TLS {
-        let domain_name = server_addr.split(':').next().unwrap_or_default();
+    if config.enable_tls {
+        let domain_name = 
config.server_addr.split(':').next().unwrap_or_default();
         debug!(domain_name, "Configure TLS domain");
         let mut tls = ClientTlsConfig::new().domain_name(domain_name);
 
-        let ssl_trusted_ca_path = SSL_TRUSTED_CA_PATH.as_str();
+        let ssl_trusted_ca_path = &config.ssl_trusted_ca_path;
         if !ssl_trusted_ca_path.is_empty() {
             debug!(ssl_trusted_ca_path, "Configure TLS CA");
-            let ca_cert = tokio::fs::read(&*SSL_TRUSTED_CA_PATH).await?;
+            let ca_cert = tokio::fs::read(&config.ssl_trusted_ca_path).await?;
             let ca_cert = Certificate::from_pem(ca_cert);
             tls = tls.ca_certificate(ca_cert);
         }
 
-        let ssl_key_path = SSL_KEY_PATH.as_str();
-        let ssl_cert_chain_path = SSL_CERT_CHAIN_PATH.as_str();
+        let ssl_key_path = &config.ssl_key_path;
+        let ssl_cert_chain_path = &config.ssl_cert_chain_path;
         if !ssl_key_path.is_empty() && !ssl_cert_chain_path.is_empty() {
             debug!(ssl_trusted_ca_path, "Configure mTLS");
-            let client_cert = tokio::fs::read(&*SSL_CERT_CHAIN_PATH).await?;
-            let client_key = tokio::fs::read(&*SSL_KEY_PATH).await?;
+            let client_cert = 
tokio::fs::read(&config.ssl_cert_chain_path).await?;
+            let client_key = tokio::fs::read(&config.ssl_key_path).await?;
             let client_identity = Identity::from_pem(client_cert, client_key);
             tls = tls.identity(client_identity);
         }
diff --git a/src/reporter/reporter_kafka.rs 
b/worker/src/reporter/reporter_kafka.rs
similarity index 82%
rename from src/reporter/reporter_kafka.rs
rename to worker/src/reporter/reporter_kafka.rs
index 61e4cf6..204564c 100644
--- a/src/reporter/reporter_kafka.rs
+++ b/worker/src/reporter/reporter_kafka.rs
@@ -15,7 +15,6 @@
 
 #![cfg(feature = "kafka-reporter")]
 
-use crate::module::{KAFKA_BOOTSTRAP_SERVERS, KAFKA_PRODUCER_CONFIG};
 use anyhow::{bail, Context};
 use skywalking::reporter::{
     kafka::{KafkaReportBuilder, RDKafkaClientConfig},
@@ -23,14 +22,20 @@ use skywalking::reporter::{
 };
 use std::collections::HashMap;
 
+pub struct KafkaReporterConfiguration {
+    pub kafka_bootstrap_servers: String,
+    pub kafka_producer_config: String,
+}
+
 pub async fn run_reporter(
-    producer: impl CollectItemProduce, consumer: impl CollectItemConsume,
+    config: KafkaReporterConfiguration, producer: impl CollectItemProduce,
+    consumer: impl CollectItemConsume,
 ) -> anyhow::Result<()> {
     let mut client_config = RDKafkaClientConfig::new();
 
-    client_config.set("bootstrap.servers", &*KAFKA_BOOTSTRAP_SERVERS);
+    client_config.set("bootstrap.servers", config.kafka_bootstrap_servers);
 
-    let config = serde_json::from_str::<HashMap<String, 
String>>(&KAFKA_PRODUCER_CONFIG)
+    let config = serde_json::from_str::<HashMap<String, 
String>>(&config.kafka_producer_config)
         .context("parse kafka producer config failed")?;
     for (key, value) in config {
         client_config.set(key, value);


Reply via email to