On Sat, Aug 26, 2023 at 11:29:59AM +0000, Tage Johansson wrote:
> This patch adds an example using the asynchronous Rust bindings.
> ---
>  rust/Cargo.toml                        |   1 +
>  rust/examples/concurrent-read-write.rs | 149 +++++++++++++++++++++++++
>  rust/run-tests.sh.in                   |   2 +
>  3 files changed, 152 insertions(+)
>  create mode 100644 rust/examples/concurrent-read-write.rs
> 
> diff --git a/rust/Cargo.toml b/rust/Cargo.toml
> index c49f9f2..0879b34 100644
> --- a/rust/Cargo.toml
> +++ b/rust/Cargo.toml
> @@ -58,5 +58,6 @@ default = ["log", "tokio"]
>  anyhow = "1.0.72"
>  once_cell = "1.18.0"
>  pretty-hex = "0.3.0"
> +rand = { version = "0.8.5", default-features = false, features = 
> ["small_rng", "min_const_gen"] }
>  tempfile = "3.6.0"
>  tokio = { version = "1.29.1", default-features = false, features = 
> ["rt-multi-thread", "macros"] }
> diff --git a/rust/examples/concurrent-read-write.rs 
> b/rust/examples/concurrent-read-write.rs
> new file mode 100644
> index 0000000..4858f76
> --- /dev/null
> +++ b/rust/examples/concurrent-read-write.rs
> @@ -0,0 +1,149 @@
> +//! Example usage with nbdkit:
> +//!     nbdkit -U - memory 100M \
> +//!       --run 'cargo run --example concurrent-read-write -- $unixsocket'
> +//! Or connect over a URI:
> +//!     nbdkit -U - memory 100M \
> +//!       --run 'cargo run --example concurrent-read-write -- $uri'

Should be "$uri" here (to avoid accidental shell globbing surprises).

> +//!
> +//! This will read and write randomly over the first megabyte of the

This says first megabyte...[1]

> +//! plugin using multi-conn, multiple threads and multiple requests in
> +//! flight on each thread.
> +
> +#![deny(warnings)]
> +use rand::prelude::*;
> +use std::env;
> +use std::sync::Arc;
> +use tokio::task::JoinSet;
> +
> +/// Number of simultaneous connections to the NBD server.
> +///
> +/// Note that some servers only support a limited number of
> +/// simultaneous connections, and/or have a configurable thread pool
> +/// internally, and if you exceed those limits then something will break.
> +const NR_MULTI_CONN: usize = 8;
> +
> +/// Number of commands that can be "in flight" at the same time on each
> +/// connection.  (Therefore the total number of requests in flight may
> +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT).
> +const MAX_IN_FLIGHT: usize = 16;
> +
> +/// The size of large reads and writes, must be > 512.
> +const BUFFER_SIZE: usize = 1024;

1024 isn't much larger than 512.  It looks like you borrowed heavily
from examples/threaded-reads-and-writes.c, but that used 1M as the
large buffer.

> +
> +/// Number of commands we issue (per [task][tokio::task]).
> +const NR_CYCLES: usize = 32;
> +
> +/// Statistics gathered during the run.
> +#[derive(Debug, Default)]
> +struct Stats {
> +    /// The total number of requests made.
> +    requests: usize,
> +}
> +
> +#[tokio::main]
> +async fn main() -> anyhow::Result<()> {
> +    let args = env::args_os().collect::<Vec<_>>();
> +    if args.len() != 2 {
> +        anyhow::bail!("Usage: {:?} socket", args[0]);
> +    }
> +
> +    // We begin by making a connection to the server to get the export size
> +    // and ensure that it supports multiple connections and is writable.
> +    let nbd = libnbd::Handle::new()?;
> +
> +    // Check if the user provided a URI or a unix socket.
> +    let socket_or_uri = args[1].to_str().unwrap();
> +    if socket_or_uri.contains("://") {
> +        nbd.connect_uri(socket_or_uri)?;
> +    } else {
> +        nbd.connect_unix(socket_or_uri)?;
> +    }
> +
> +    let export_size = nbd.get_size()?;
> +    anyhow::ensure!(
> +        (BUFFER_SIZE as u64) < export_size,
> +        "export is {export_size}B, must be larger than {BUFFER_SIZE}B"
> +    );
> +    anyhow::ensure!(
> +        !nbd.is_read_only()?,
> +        "error: this NBD export is read-only"
> +    );
> +    anyhow::ensure!(
> +        nbd.can_multi_conn()?,
> +        "error: this NBD export does not support multi-conn"
> +    );
> +    drop(nbd); // Close the connection.
> +
> +    // Start the worker tasks, one per connection.
> +    let mut tasks = JoinSet::new();
> +    for i in 0..NR_MULTI_CONN {
> +        tasks.spawn(run_thread(i, socket_or_uri.to_owned(), export_size));
> +    }
> +
> +    // Wait for the tasks to complete.
> +    let mut stats = Stats::default();
> +    while !tasks.is_empty() {
> +        let this_stats = tasks.join_next().await.unwrap().unwrap()?;
> +        stats.requests += this_stats.requests;
> +    }
> +
> +    // Make sure the number of requests that were required matches what
> +    // we expect.
> +    assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES);
> +
> +    Ok(())
> +}
> +
> +async fn run_thread(
> +    task_idx: usize,
> +    socket_or_uri: String,
> +    export_size: u64,
> +) -> anyhow::Result<Stats> {
> +    // Start a new connection to the server.
> +    // We shall spawn many commands concurrently on different tasks and those
> +    // futures must be `'static`, hence we wrap the handle in an [Arc].
> +    let nbd = Arc::new(libnbd::AsyncHandle::new()?);
> +
> +    // Check if the user provided a URI or a unix socket.
> +    if socket_or_uri.contains("://") {
> +        nbd.connect_uri(socket_or_uri).await?;
> +    } else {
> +        nbd.connect_unix(socket_or_uri).await?;
> +    }
> +
> +    let mut rng = SmallRng::seed_from_u64(44 as u64);

Why are you hard-coding this to the same fixed seed, for every worker
thread?

> +
> +    // Issue commands.
> +    let mut stats = Stats::default();
> +    let mut join_set = JoinSet::new();
> +    //tokio::time::sleep(std::time::Duration::from_secs(1)).await;

Does this comment need to remain?

> +    while stats.requests < NR_CYCLES || !join_set.is_empty() {
> +        while stats.requests < NR_CYCLES && join_set.len() < MAX_IN_FLIGHT {
> +            // If we want to issue another request, do so.  Note that we 
> reuse
> +            // the same buffer for multiple in-flight requests.  It doesn't
> +            // matter here because we're just trying to write random stuff,
> +            // but that would be Very Bad in a real application.
> +            // Simulate a mix of large and small requests.
> +            let size = if rng.gen() { BUFFER_SIZE } else { 512 };

Is this missing a math operation to check only one bit of the generated value?

/me goes and reads more Rust docs

Oh, so rng.gen() is shorthand for rng.gen::<bool>() when used in a
boolean context, which really does have a 50/50 split in returned
values?  Cool shorthand!

> +            let offset = rng.gen_range(0..export_size - size as u64);

[1]...but you are actually targetting anywhere in the image.  It looks
like that was a copy-paste error from a comment that has gone stale
after refactoring in the original C code.

> +
> +            let mut buf = [0u8; BUFFER_SIZE];
> +            let nbd = nbd.clone();
> +            if rng.gen() {
> +                join_set.spawn(async move {
> +                    nbd.pread(&mut buf, offset, None).await
> +                });
> +            } else {
> +                // Fill the buf with random data.
> +                rng.fill(&mut buf);
> +                join_set
> +                    .spawn(async move { nbd.pwrite(&buf, offset, None).await 
> });

I'm impressed: the async handle makes for functions that use a lot
less boilerplate than the comparable C nbd_aio_* counterparts.

> +            }
> +            stats.requests += 1;
> +        }
> +        join_set.join_next().await.unwrap().unwrap()?;
> +    }
> +
> +    if task_idx == 0 {}
> +    Ok(stats)
> +}
> diff --git a/rust/run-tests.sh.in b/rust/run-tests.sh.in
> index 3ebf9a1..661c018 100755

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.
Virtualization:  qemu.org | libguestfs.org
_______________________________________________
Libguestfs mailing list
Libguestfs@redhat.com
https://listman.redhat.com/mailman/listinfo/libguestfs

Reply via email to