Re: [Libguestfs] [libnbd PATCH v9 6/7] rust: async: Add an example

2023-09-05 Thread Eric Blake
On Mon, Sep 04, 2023 at 04:56:07PM +, Tage Johansson wrote:
> 
> On 9/1/2023 10:41 PM, Eric Blake wrote:
> > On Sat, Aug 26, 2023 at 11:29:59AM +, Tage Johansson wrote:
> > > This patch adds an example using the asynchronous Rust bindings.
> > > ---
> > >   rust/Cargo.toml|   1 +
> > >   rust/examples/concurrent-read-write.rs | 149 +
> > >   rust/run-tests.sh.in   |   2 +
> > >   3 files changed, 152 insertions(+)
> > >   create mode 100644 rust/examples/concurrent-read-write.rs
> > > 
> > > diff --git a/rust/Cargo.toml b/rust/Cargo.toml
> > > index c49f9f2..0879b34 100644
> > > --- a/rust/Cargo.toml
> > > +++ b/rust/Cargo.toml
> > > @@ -58,5 +58,6 @@ default = ["log", "tokio"]
> > >   anyhow = "1.0.72"
> > >   once_cell = "1.18.0"
> > >   pretty-hex = "0.3.0"
> > > +rand = { version = "0.8.5", default-features = false, features = 
> > > ["small_rng", "min_const_gen"] }
> > >   tempfile = "3.6.0"
> > >   tokio = { version = "1.29.1", default-features = false, features = 
> > > ["rt-multi-thread", "macros"] }
> > > diff --git a/rust/examples/concurrent-read-write.rs 
> > > b/rust/examples/concurrent-read-write.rs
> > > new file mode 100644
> > > index 000..4858f76
> > > --- /dev/null
> > > +++ b/rust/examples/concurrent-read-write.rs
> > > @@ -0,0 +1,149 @@
> > > +//! Example usage with nbdkit:
> > > +//! nbdkit -U - memory 100M \
> > > +//!   --run 'cargo run --example concurrent-read-write -- 
> > > $unixsocket'
> > > +//! Or connect over a URI:
> > > +//! nbdkit -U - memory 100M \
> > > +//!   --run 'cargo run --example concurrent-read-write -- $uri'
> > Should be "$uri" here (to avoid accidental shell globbing surprises).
> > 
> > > +//!
> > > +//! This will read and write randomly over the first megabyte of the
> > This says first megabyte...[1]
> 
> 
> If I understand my code correctly, it is actually reading and writing
> randomly over the entire memory.

Yes (aka, the comment is wrong in the source file, we should clean it
up there as well as here).

> 
> > > +//! plugin using multi-conn, multiple threads and multiple requests in
> > > +//! flight on each thread.
> > > +
> > > +#![deny(warnings)]
> > > +use rand::prelude::*;
> > > +use std::env;
> > > +use std::sync::Arc;
> > > +use tokio::task::JoinSet;
> > > +
> > > +/// Number of simultaneous connections to the NBD server.
> > > +///
> > > +/// Note that some servers only support a limited number of
> > > +/// simultaneous connections, and/or have a configurable thread pool
> > > +/// internally, and if you exceed those limits then something will break.
> > > +const NR_MULTI_CONN: usize = 8;
> > > +
> > > +/// Number of commands that can be "in flight" at the same time on each
> > > +/// connection.  (Therefore the total number of requests in flight may
> > > +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT).
> > > +const MAX_IN_FLIGHT: usize = 16;
> > > +
> > > +/// The size of large reads and writes, must be > 512.
> > > +const BUFFER_SIZE: usize = 1024;
> > 1024 isn't much larger than 512.  It looks like you borrowed heavily
> > from examples/threaded-reads-and-writes.c, but that used 1M as the
> > large buffer.
> 
> 
> The reason for this is that we can't read and write to a shared buffer
> simultaneously in safe Rust. So the buffer gets created on the fly for each
> read/write operation. And creating a 1M buffer so many times seemd like a
> bit too much memory comsumtion.

Agreed that lots of memory use is undesirable.  The original C test
exploits and documents that it is doing an unsafe optimization of
reusing a single buffer across simultaneous operations merely to
demonstrate speed of the API; so it is interesting proof that Rust has
succeeded in this instance at its goal of letting the compiler tell us
that our (known) unsafe buffer sharing is not kosher.  Still, that's
WHY Rust has the 'unsafe' keyword, to let us explicitly document when
we know we are doing something that is unusual.  It seems to me that
this example would be truer to the original example if we are able to
add in 'unsafe' and intentionally share a buffer across multiple
threads (despite access to the contents of that buffer no longer being
well-defined) than to avoid the 'unsafe' keyword.

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.
Virtualization:  qemu.org | libguestfs.org
___
Libguestfs mailing list
Libguestfs@redhat.com
https://listman.redhat.com/mailman/listinfo/libguestfs



Re: [Libguestfs] [libnbd PATCH v9 6/7] rust: async: Add an example

2023-09-04 Thread Tage Johansson



On 9/1/2023 10:41 PM, Eric Blake wrote:

On Sat, Aug 26, 2023 at 11:29:59AM +, Tage Johansson wrote:

This patch adds an example using the asynchronous Rust bindings.
---
  rust/Cargo.toml|   1 +
  rust/examples/concurrent-read-write.rs | 149 +
  rust/run-tests.sh.in   |   2 +
  3 files changed, 152 insertions(+)
  create mode 100644 rust/examples/concurrent-read-write.rs

diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index c49f9f2..0879b34 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -58,5 +58,6 @@ default = ["log", "tokio"]
  anyhow = "1.0.72"
  once_cell = "1.18.0"
  pretty-hex = "0.3.0"
+rand = { version = "0.8.5", default-features = false, features = ["small_rng", 
"min_const_gen"] }
  tempfile = "3.6.0"
  tokio = { version = "1.29.1", default-features = false, features = ["rt-multi-thread", 
"macros"] }
diff --git a/rust/examples/concurrent-read-write.rs 
b/rust/examples/concurrent-read-write.rs
new file mode 100644
index 000..4858f76
--- /dev/null
+++ b/rust/examples/concurrent-read-write.rs
@@ -0,0 +1,149 @@
+//! Example usage with nbdkit:
+//! nbdkit -U - memory 100M \
+//!   --run 'cargo run --example concurrent-read-write -- $unixsocket'
+//! Or connect over a URI:
+//! nbdkit -U - memory 100M \
+//!   --run 'cargo run --example concurrent-read-write -- $uri'

Should be "$uri" here (to avoid accidental shell globbing surprises).


+//!
+//! This will read and write randomly over the first megabyte of the

This says first megabyte...[1]



If I understand my code correctly, it is actually reading and writing 
randomly over the entire memory.



+//! plugin using multi-conn, multiple threads and multiple requests in
+//! flight on each thread.
+
+#![deny(warnings)]
+use rand::prelude::*;
+use std::env;
+use std::sync::Arc;
+use tokio::task::JoinSet;
+
+/// Number of simultaneous connections to the NBD server.
+///
+/// Note that some servers only support a limited number of
+/// simultaneous connections, and/or have a configurable thread pool
+/// internally, and if you exceed those limits then something will break.
+const NR_MULTI_CONN: usize = 8;
+
+/// Number of commands that can be "in flight" at the same time on each
+/// connection.  (Therefore the total number of requests in flight may
+/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT).
+const MAX_IN_FLIGHT: usize = 16;
+
+/// The size of large reads and writes, must be > 512.
+const BUFFER_SIZE: usize = 1024;

1024 isn't much larger than 512.  It looks like you borrowed heavily
from examples/threaded-reads-and-writes.c, but that used 1M as the
large buffer.



The reason for this is that we can't read and write to a shared buffer 
simultaneously in safe Rust. So the buffer gets created on the fly for 
each read/write operation. And creating a 1M buffer so many times seemd 
like a bit too much memory comsumtion.




+
+/// Number of commands we issue (per [task][tokio::task]).
+const NR_CYCLES: usize = 32;
+
+/// Statistics gathered during the run.
+#[derive(Debug, Default)]
+struct Stats {
+/// The total number of requests made.
+requests: usize,
+}
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+let args = env::args_os().collect::>();
+if args.len() != 2 {
+anyhow::bail!("Usage: {:?} socket", args[0]);
+}
+
+// We begin by making a connection to the server to get the export size
+// and ensure that it supports multiple connections and is writable.
+let nbd = libnbd::Handle::new()?;
+
+// Check if the user provided a URI or a unix socket.
+let socket_or_uri = args[1].to_str().unwrap();
+if socket_or_uri.contains("://") {
+nbd.connect_uri(socket_or_uri)?;
+} else {
+nbd.connect_unix(socket_or_uri)?;
+}
+
+let export_size = nbd.get_size()?;
+anyhow::ensure!(
+(BUFFER_SIZE as u64) < export_size,
+"export is {export_size}B, must be larger than {BUFFER_SIZE}B"
+);
+anyhow::ensure!(
+!nbd.is_read_only()?,
+"error: this NBD export is read-only"
+);
+anyhow::ensure!(
+nbd.can_multi_conn()?,
+"error: this NBD export does not support multi-conn"
+);
+drop(nbd); // Close the connection.
+
+// Start the worker tasks, one per connection.
+let mut tasks = JoinSet::new();
+for i in 0..NR_MULTI_CONN {
+tasks.spawn(run_thread(i, socket_or_uri.to_owned(), export_size));
+}
+
+// Wait for the tasks to complete.
+let mut stats = Stats::default();
+while !tasks.is_empty() {
+let this_stats = tasks.join_next().await.unwrap().unwrap()?;
+stats.requests += this_stats.requests;
+}
+
+// Make sure the number of requests that were required matches what
+// we expect.
+assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES);
+
+Ok(())
+}
+
+async fn run_thread(
+task_idx: usize,
+socket_or_uri: String,
+export_size: u64,
+) -> 

Re: [Libguestfs] [libnbd PATCH v9 6/7] rust: async: Add an example

2023-09-01 Thread Eric Blake
On Sat, Aug 26, 2023 at 11:29:59AM +, Tage Johansson wrote:
> This patch adds an example using the asynchronous Rust bindings.
> ---
>  rust/Cargo.toml|   1 +
>  rust/examples/concurrent-read-write.rs | 149 +
>  rust/run-tests.sh.in   |   2 +
>  3 files changed, 152 insertions(+)
>  create mode 100644 rust/examples/concurrent-read-write.rs
> 
> diff --git a/rust/Cargo.toml b/rust/Cargo.toml
> index c49f9f2..0879b34 100644
> --- a/rust/Cargo.toml
> +++ b/rust/Cargo.toml
> @@ -58,5 +58,6 @@ default = ["log", "tokio"]
>  anyhow = "1.0.72"
>  once_cell = "1.18.0"
>  pretty-hex = "0.3.0"
> +rand = { version = "0.8.5", default-features = false, features = 
> ["small_rng", "min_const_gen"] }
>  tempfile = "3.6.0"
>  tokio = { version = "1.29.1", default-features = false, features = 
> ["rt-multi-thread", "macros"] }
> diff --git a/rust/examples/concurrent-read-write.rs 
> b/rust/examples/concurrent-read-write.rs
> new file mode 100644
> index 000..4858f76
> --- /dev/null
> +++ b/rust/examples/concurrent-read-write.rs
> @@ -0,0 +1,149 @@
> +//! Example usage with nbdkit:
> +//! nbdkit -U - memory 100M \
> +//!   --run 'cargo run --example concurrent-read-write -- $unixsocket'
> +//! Or connect over a URI:
> +//! nbdkit -U - memory 100M \
> +//!   --run 'cargo run --example concurrent-read-write -- $uri'

Should be "$uri" here (to avoid accidental shell globbing surprises).

> +//!
> +//! This will read and write randomly over the first megabyte of the

This says first megabyte...[1]

> +//! plugin using multi-conn, multiple threads and multiple requests in
> +//! flight on each thread.
> +
> +#![deny(warnings)]
> +use rand::prelude::*;
> +use std::env;
> +use std::sync::Arc;
> +use tokio::task::JoinSet;
> +
> +/// Number of simultaneous connections to the NBD server.
> +///
> +/// Note that some servers only support a limited number of
> +/// simultaneous connections, and/or have a configurable thread pool
> +/// internally, and if you exceed those limits then something will break.
> +const NR_MULTI_CONN: usize = 8;
> +
> +/// Number of commands that can be "in flight" at the same time on each
> +/// connection.  (Therefore the total number of requests in flight may
> +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT).
> +const MAX_IN_FLIGHT: usize = 16;
> +
> +/// The size of large reads and writes, must be > 512.
> +const BUFFER_SIZE: usize = 1024;

1024 isn't much larger than 512.  It looks like you borrowed heavily
from examples/threaded-reads-and-writes.c, but that used 1M as the
large buffer.

> +
> +/// Number of commands we issue (per [task][tokio::task]).
> +const NR_CYCLES: usize = 32;
> +
> +/// Statistics gathered during the run.
> +#[derive(Debug, Default)]
> +struct Stats {
> +/// The total number of requests made.
> +requests: usize,
> +}
> +
> +#[tokio::main]
> +async fn main() -> anyhow::Result<()> {
> +let args = env::args_os().collect::>();
> +if args.len() != 2 {
> +anyhow::bail!("Usage: {:?} socket", args[0]);
> +}
> +
> +// We begin by making a connection to the server to get the export size
> +// and ensure that it supports multiple connections and is writable.
> +let nbd = libnbd::Handle::new()?;
> +
> +// Check if the user provided a URI or a unix socket.
> +let socket_or_uri = args[1].to_str().unwrap();
> +if socket_or_uri.contains("://") {
> +nbd.connect_uri(socket_or_uri)?;
> +} else {
> +nbd.connect_unix(socket_or_uri)?;
> +}
> +
> +let export_size = nbd.get_size()?;
> +anyhow::ensure!(
> +(BUFFER_SIZE as u64) < export_size,
> +"export is {export_size}B, must be larger than {BUFFER_SIZE}B"
> +);
> +anyhow::ensure!(
> +!nbd.is_read_only()?,
> +"error: this NBD export is read-only"
> +);
> +anyhow::ensure!(
> +nbd.can_multi_conn()?,
> +"error: this NBD export does not support multi-conn"
> +);
> +drop(nbd); // Close the connection.
> +
> +// Start the worker tasks, one per connection.
> +let mut tasks = JoinSet::new();
> +for i in 0..NR_MULTI_CONN {
> +tasks.spawn(run_thread(i, socket_or_uri.to_owned(), export_size));
> +}
> +
> +// Wait for the tasks to complete.
> +let mut stats = Stats::default();
> +while !tasks.is_empty() {
> +let this_stats = tasks.join_next().await.unwrap().unwrap()?;
> +stats.requests += this_stats.requests;
> +}
> +
> +// Make sure the number of requests that were required matches what
> +// we expect.
> +assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES);
> +
> +Ok(())
> +}
> +
> +async fn run_thread(
> +task_idx: usize,
> +socket_or_uri: String,
> +export_size: u64,
> +) -> anyhow::Result {
> +// Start a new connection to the server.
> +// We shall spawn many commands concurrently on different tasks and those
> +// futures must 

[Libguestfs] [libnbd PATCH v9 6/7] rust: async: Add an example

2023-08-26 Thread Tage Johansson
This patch adds an example using the asynchronous Rust bindings.
---
 rust/Cargo.toml|   1 +
 rust/examples/concurrent-read-write.rs | 149 +
 rust/run-tests.sh.in   |   2 +
 3 files changed, 152 insertions(+)
 create mode 100644 rust/examples/concurrent-read-write.rs

diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index c49f9f2..0879b34 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -58,5 +58,6 @@ default = ["log", "tokio"]
 anyhow = "1.0.72"
 once_cell = "1.18.0"
 pretty-hex = "0.3.0"
+rand = { version = "0.8.5", default-features = false, features = ["small_rng", 
"min_const_gen"] }
 tempfile = "3.6.0"
 tokio = { version = "1.29.1", default-features = false, features = 
["rt-multi-thread", "macros"] }
diff --git a/rust/examples/concurrent-read-write.rs 
b/rust/examples/concurrent-read-write.rs
new file mode 100644
index 000..4858f76
--- /dev/null
+++ b/rust/examples/concurrent-read-write.rs
@@ -0,0 +1,149 @@
+//! Example usage with nbdkit:
+//! nbdkit -U - memory 100M \
+//!   --run 'cargo run --example concurrent-read-write -- $unixsocket'
+//! Or connect over a URI:
+//! nbdkit -U - memory 100M \
+//!   --run 'cargo run --example concurrent-read-write -- $uri'
+//!
+//! This will read and write randomly over the first megabyte of the
+//! plugin using multi-conn, multiple threads and multiple requests in
+//! flight on each thread.
+
+#![deny(warnings)]
+use rand::prelude::*;
+use std::env;
+use std::sync::Arc;
+use tokio::task::JoinSet;
+
+/// Number of simultaneous connections to the NBD server.
+///
+/// Note that some servers only support a limited number of
+/// simultaneous connections, and/or have a configurable thread pool
+/// internally, and if you exceed those limits then something will break.
+const NR_MULTI_CONN: usize = 8;
+
+/// Number of commands that can be "in flight" at the same time on each
+/// connection.  (Therefore the total number of requests in flight may
+/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT).
+const MAX_IN_FLIGHT: usize = 16;
+
+/// The size of large reads and writes, must be > 512.
+const BUFFER_SIZE: usize = 1024;
+
+/// Number of commands we issue (per [task][tokio::task]).
+const NR_CYCLES: usize = 32;
+
+/// Statistics gathered during the run.
+#[derive(Debug, Default)]
+struct Stats {
+/// The total number of requests made.
+requests: usize,
+}
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+let args = env::args_os().collect::>();
+if args.len() != 2 {
+anyhow::bail!("Usage: {:?} socket", args[0]);
+}
+
+// We begin by making a connection to the server to get the export size
+// and ensure that it supports multiple connections and is writable.
+let nbd = libnbd::Handle::new()?;
+
+// Check if the user provided a URI or a unix socket.
+let socket_or_uri = args[1].to_str().unwrap();
+if socket_or_uri.contains("://") {
+nbd.connect_uri(socket_or_uri)?;
+} else {
+nbd.connect_unix(socket_or_uri)?;
+}
+
+let export_size = nbd.get_size()?;
+anyhow::ensure!(
+(BUFFER_SIZE as u64) < export_size,
+"export is {export_size}B, must be larger than {BUFFER_SIZE}B"
+);
+anyhow::ensure!(
+!nbd.is_read_only()?,
+"error: this NBD export is read-only"
+);
+anyhow::ensure!(
+nbd.can_multi_conn()?,
+"error: this NBD export does not support multi-conn"
+);
+drop(nbd); // Close the connection.
+
+// Start the worker tasks, one per connection.
+let mut tasks = JoinSet::new();
+for i in 0..NR_MULTI_CONN {
+tasks.spawn(run_thread(i, socket_or_uri.to_owned(), export_size));
+}
+
+// Wait for the tasks to complete.
+let mut stats = Stats::default();
+while !tasks.is_empty() {
+let this_stats = tasks.join_next().await.unwrap().unwrap()?;
+stats.requests += this_stats.requests;
+}
+
+// Make sure the number of requests that were required matches what
+// we expect.
+assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES);
+
+Ok(())
+}
+
+async fn run_thread(
+task_idx: usize,
+socket_or_uri: String,
+export_size: u64,
+) -> anyhow::Result {
+// Start a new connection to the server.
+// We shall spawn many commands concurrently on different tasks and those
+// futures must be `'static`, hence we wrap the handle in an [Arc].
+let nbd = Arc::new(libnbd::AsyncHandle::new()?);
+
+// Check if the user provided a URI or a unix socket.
+if socket_or_uri.contains("://") {
+nbd.connect_uri(socket_or_uri).await?;
+} else {
+nbd.connect_unix(socket_or_uri).await?;
+}
+
+let mut rng = SmallRng::seed_from_u64(44 as u64);
+
+// Issue commands.
+let mut stats = Stats::default();
+let mut join_set = JoinSet::new();
+//tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+while stats.requests <