jmjoy commented on code in PR #26:
URL: https://github.com/apache/skywalking-php/pull/26#discussion_r1002937926
##########
src/channel.rs:
##########
@@ -75,67 +47,34 @@ async fn channel_receive(receiver: &mut UnixStream) ->
anyhow::Result<CollectIte
Ok(item)
}
-fn channel_try_receive(receiver: &UnixStream) ->
anyhow::Result<Option<CollectItem>> {
- let mut size_buf = [0u8; size_of::<usize>()];
- if let Err(e) = receiver.try_read(&mut size_buf) {
- if e.kind() == io::ErrorKind::WouldBlock {
- return Ok(None);
- }
- return Err(e.into());
- }
- let size = usize::from_le_bytes(size_buf);
-
- let mut buf = vec![0u8; size];
- if let Err(e) = receiver.try_read(&mut buf) {
- if e.kind() == io::ErrorKind::WouldBlock {
- return Ok(None);
- }
- return Err(e.into());
- }
-
- let item = bincode::deserialize(&buf)?;
- Ok(item)
+pub struct Reporter<T: AsRef<Path>> {
+ worker_addr: T,
+ stream: OnceCell<Mutex<UnixStream>>,
}
-pub struct Reporter;
-
-impl Report for Reporter {
- fn report(&self, item: CollectItem) {
- if let Err(err) = channel_send(item) {
- error!(?err, "channel send failed");
+impl<T: AsRef<Path>> Reporter<T> {
+ pub fn new(worker_addr: T) -> Self {
+ Self {
+ worker_addr,
+ stream: OnceCell::new(),
}
}
-}
-pub struct Consumer(UnixStream);
-
-impl Consumer {
- pub fn new() -> anyhow::Result<Self> {
- let receiver = RECEIVER.get().context("Channel haven't initialized")?;
- let receiver = receiver
+ fn try_report(&self, item: CollectItem) -> anyhow::Result<()> {
+ let stream = self
+ .stream
+ .get_or_try_init(||
UnixStream::connect(&self.worker_addr).map(Mutex::new))?
.lock()
- .map_err(|_| anyhow!("Get Lock failed"))?
- .take()
- .context("The RECEIVER has been taked")?;
- let receiver =
- UnixStream::from_std(receiver).context("try into tokio unix stream
failed")?;
- Ok(Self(receiver))
- }
-}
+ .map_err(|_| anyhow!("Get Lock failed"))?;
-#[async_trait]
-impl ColletcItemConsume for Consumer {
- async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error +
Send>> {
- match channel_receive(&mut self.0).await {
- Ok(item) => Ok(Some(item)),
- Err(e) => Err(e.into()),
- }
+ channel_send(item, stream)
Review Comment:
OK. In this case, requests will block only when the uds buffer is full. It
should be rare.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]