jmjoy commented on code in PR #26:
URL: https://github.com/apache/skywalking-php/pull/26#discussion_r1002643848


##########
src/module.rs:
##########
@@ -61,14 +56,24 @@ pub fn init(_module: ModuleContext) -> bool {
         service_instance, skywalking_version, "Starting skywalking agent"
     );
 
+    let worker_addr = {
+        match tempfile::NamedTempFile::new() {
+            Err(e) => {
+                error!("Create named temporary file failed: {}", e);
+                return true;
+            }
+            Ok(f) => f.into_temp_path().to_str().unwrap().to_string(),

Review Comment:
   Should not use `unwrap()`, please use `error!()` and `return`.



##########
src/channel.rs:
##########
@@ -75,67 +47,34 @@ async fn channel_receive(receiver: &mut UnixStream) -> 
anyhow::Result<CollectIte
     Ok(item)
 }
 
-fn channel_try_receive(receiver: &UnixStream) -> 
anyhow::Result<Option<CollectItem>> {
-    let mut size_buf = [0u8; size_of::<usize>()];
-    if let Err(e) = receiver.try_read(&mut size_buf) {
-        if e.kind() == io::ErrorKind::WouldBlock {
-            return Ok(None);
-        }
-        return Err(e.into());
-    }
-    let size = usize::from_le_bytes(size_buf);
-
-    let mut buf = vec![0u8; size];
-    if let Err(e) = receiver.try_read(&mut buf) {
-        if e.kind() == io::ErrorKind::WouldBlock {
-            return Ok(None);
-        }
-        return Err(e.into());
-    }
-
-    let item = bincode::deserialize(&buf)?;
-    Ok(item)
+pub struct Reporter<T: AsRef<Path>> {
+    worker_addr: T,
+    stream: OnceCell<Mutex<UnixStream>>,
 }
 
-pub struct Reporter;
-
-impl Report for Reporter {
-    fn report(&self, item: CollectItem) {
-        if let Err(err) = channel_send(item) {
-            error!(?err, "channel send failed");
+impl<T: AsRef<Path>> Reporter<T> {
+    pub fn new(worker_addr: T) -> Self {
+        Self {
+            worker_addr,
+            stream: OnceCell::new(),
         }
     }
-}
 
-pub struct Consumer(UnixStream);
-
-impl Consumer {
-    pub fn new() -> anyhow::Result<Self> {
-        let receiver = RECEIVER.get().context("Channel haven't initialized")?;
-        let receiver = receiver
+    fn try_report(&self, item: CollectItem) -> anyhow::Result<()> {
+        let stream = self
+            .stream
+            .get_or_try_init(|| 
UnixStream::connect(&self.worker_addr).map(Mutex::new))?
             .lock()
-            .map_err(|_| anyhow!("Get Lock failed"))?
-            .take()
-            .context("The RECEIVER has been taked")?;
-        let receiver =
-            UnixStream::from_std(receiver).context("try into tokio unix stream 
failed")?;
-        Ok(Self(receiver))
-    }
-}
+            .map_err(|_| anyhow!("Get Lock failed"))?;
 
-#[async_trait]
-impl ColletcItemConsume for Consumer {
-    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + 
Send>> {
-        match channel_receive(&mut self.0).await {
-            Ok(item) => Ok(Some(item)),
-            Err(e) => Err(e.into()),
-        }
+        channel_send(item, stream)

Review Comment:
   Should set the stream nonblocking.



##########
src/worker.rs:
##########
@@ -83,23 +94,59 @@ async fn start_worker(server_addr: String) {
     };
 
     let fut = async move {
-        let endpoint = match Endpoint::from_shared(server_addr) {
-            Ok(endpoint) => endpoint,
+        let (tx, rx) = mpsc::channel::<Result<CollectItem, Box<dyn Error + 
Send>>>(255);
+        let listener = match UnixListener::bind(worker_addr) {
+            Ok(listener) => listener,
             Err(err) => {
-                error!(?err, "Create endpoint failed");
+                error!(?err, "Bind failed");
                 return;
             }
         };
-        let channel = connect(endpoint).await;
 
-        let consumer = match channel::Consumer::new() {
-            Ok(consumer) => consumer,
+        debug!("Bind");
+        tokio::spawn(async move {
+            loop {
+                match listener.accept().await {
+                    Ok((mut stream, _addr)) => {
+                        debug!("Accept");
+
+                        let tx = tx.clone();
+                        tokio::spawn(async move {
+                            loop {
+                                let r = match channel::channel_receive(&mut 
stream).await {
+                                    Err(err) => match 
err.downcast_ref::<io::Error>() {
+                                        Some(e) if e.kind() == 
io::ErrorKind::UnexpectedEof => {
+                                            return
+                                        }
+                                        _ => Err(err.into()),
+                                    },
+                                    Ok(i) => Ok(i),
+                                };
+
+                                if let Err(err) = tx.send(r).await {
+                                    error!(?err, "Send failed");
+                                    return;
+                                }
+                            }
+                        });
+                    }

Review Comment:
   It seems that `(tx, rx)` and outer `tokio::spawn` can be removed, to prevent 
the data copied. In addition, please write more details in the log, like 
`debug!(server_addr, "Bind unix domain socket");`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to