Hey Aaron, some comments inline.
On Sat Jul 26, 2025 at 3:05 AM CEST, Aaron Lauterer wrote: > This tool is intended to migrate the Proxmox VE (PVE) RRD data files to > the new schema. > > Up until PVE8 the schema has been the same for a long time. With PVE9 we > introduced new columns to guests (vm) and nodes. We also switched all > types (vm, node, storate) to the same aggregation schemas as we do it in > PBS. > The result of both are a much finer resolution for long time spans, but > also larger RRD files. > > * node: 79K -> 1.4M > * vm: 66K -> 1.3m > * storage: 14K -> 156K > > The old directories for VMs used to be in `/var/lib/rrdcached/db/` with > the following sub directories: > > * nodes: `pve2-node` > * guests (VM/CT): `pve2-vm` > * storage: `pve2-storage` > > With this change we also introduce a new key schema, that makes it > easier in the future to introduce new ones. Instead of the > `pve{version}-{type}` we are switching to `pve-{type}-{version}`. > > This enables us to add new columns with a new version, without breaking > nodes that are not yet updated. We are NOT allowed to remove or re-use > existing columns. That would be a breaking change. > We are currently at version 9.0. But in the future, if needed, this tool > can be adapted to do other migrations too. > For example, {old, 9.0} -> 9.2, should that be necessary. > > The actual migration is handled by `librrd` to which we pass the path to > the old and new files, and the new RRD definitions. The `rrd_create_r2` > call then does the hard work of migrating and converting exisiting data > into the new file and aggregation schema. > > This can take some time. Quick tests on a Ryzen 7900X with the following > files: > * 1 node RRD file > * 10k vm RRD files > * 1 storage RRD file > > showed the folling results: > > * 1 thread: 179.61s user 14.82s system 100% cpu 3:14.17 total > * 4 threads: 187.57s user 16.98s system 399% cpu 51.198 total > > This is why we do not migrate inline, but have it as a separate step > during package upgrades. > > Behavior: By default nothing will be changed and a dry or test run will > happen. > Only if the `--migrate` parameter is added will the actual migration be > done. > > For each found RRD source file, the tool checks if a matching target > file already exists. By default, those will be skipped to not overwrite > target files that might already store newer data. > With the `--force` parameter this can be changed. > > That means, one can run the tool multiple times (without --force) and it > will pick up where it might have left off. For example it the migration > was interrupted for some reason. > > Once a source file has been processed it will be renamed with the `.old` > appendix. It will be excluded from future runs as we check for files > without an extension. > > The tool has some simple heuristic to determine how many threads should > be used. Be default the range is between 1 to 4 threads. But the > `--threads` parameter allows a manual override. > > Signed-off-by: Aaron Lauterer <a.laute...@proxmox.com> > --- > .cargo/config.toml | 5 + > .gitignore | 9 + > Cargo.toml | 20 ++ > build.rs | 29 ++ > src/lib.rs | 5 + > src/main.rs | 567 ++++++++++++++++++++++++++++++++++++++++ > src/parallel_handler.rs | 160 ++++++++++++ > wrapper.h | 1 + > 8 files changed, 796 insertions(+) > create mode 100644 .cargo/config.toml > create mode 100644 .gitignore > create mode 100644 Cargo.toml > create mode 100644 build.rs > create mode 100644 src/lib.rs > create mode 100644 src/main.rs > create mode 100644 src/parallel_handler.rs > create mode 100644 wrapper.h > > diff --git a/.cargo/config.toml b/.cargo/config.toml > new file mode 100644 > index 0000000..3b5b6e4 > --- /dev/null > +++ b/.cargo/config.toml > @@ -0,0 +1,5 @@ > +[source] > +[source.debian-packages] > +directory = "/usr/share/cargo/registry" > +[source.crates-io] > +replace-with = "debian-packages" > diff --git a/.gitignore b/.gitignore > new file mode 100644 > index 0000000..06ac1a1 > --- /dev/null > +++ b/.gitignore > @@ -0,0 +1,9 @@ > +*.build > +*.buildinfo > +*.changes > +*.deb > +*.dsc > +*.tar* > +target/ > +/Cargo.lock > +/proxmox-rrd-migration-tool-[0-9]*/ > diff --git a/Cargo.toml b/Cargo.toml > new file mode 100644 > index 0000000..d3523f3 > --- /dev/null > +++ b/Cargo.toml > @@ -0,0 +1,20 @@ > +[package] > +name = "proxmox_rrd_migration_8-9" > +version = "0.1.0" > +edition = "2021" > +authors = [ > + "Aaron Lauterer <a.laute...@proxmox.com>", > + "Proxmox Support Team <supp...@proxmox.com>", > +] > +license = "AGPL-3" > +homepage = "https://www.proxmox.com" > + > +[dependencies] > +anyhow = "1.0.86" > +pico-args = "0.5.0" > +proxmox-async = "0.4" > +crossbeam-channel = "0.5" > + > +[build-dependencies] > +bindgen = "0.66.1" > +pkg-config = "0.3" > diff --git a/build.rs b/build.rs > new file mode 100644 > index 0000000..56d07cc > --- /dev/null > +++ b/build.rs > @@ -0,0 +1,29 @@ > +use std::env; > +use std::path::PathBuf; > + > +fn main() { > + println!("cargo:rustc-link-lib=rrd"); > + > + println!("cargo:rerun-if-changed=wrapper.h"); > + // The bindgen::Builder is the main entry point > + // to bindgen, and lets you build up options for > + // the resulting bindings. > + > + let bindings = bindgen::Builder::default() > + // The input header we would like to generate > + // bindings for. > + .header("wrapper.h") > + // Tell cargo to invalidate the built crate whenever any of the > + // included header files changed. > + .parse_callbacks(Box::new(bindgen::CargoCallbacks)) > + // Finish the builder and generate the bindings. > + .generate() > + // Unwrap the Result and panic on failure. > + .expect("Unable to generate bindings"); > + > + // Write the bindings to the $OUT_DIR/bindings.rs file. > + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); > + bindings > + .write_to_file(out_path.join("bindings.rs")) > + .expect("Couldn't write bindings!"); > +} > diff --git a/src/lib.rs b/src/lib.rs > new file mode 100644 > index 0000000..a38a13a > --- /dev/null > +++ b/src/lib.rs > @@ -0,0 +1,5 @@ > +#![allow(non_upper_case_globals)] > +#![allow(non_camel_case_types)] > +#![allow(non_snake_case)] > + > +include!(concat!(env!("OUT_DIR"), "/bindings.rs")); > diff --git a/src/main.rs b/src/main.rs > new file mode 100644 > index 0000000..5e6418c > --- /dev/null > +++ b/src/main.rs > @@ -0,0 +1,567 @@ > +use anyhow::{bail, Error, Result}; > +use std::{ > + ffi::{CStr, CString, OsString}, > + fs, > + os::unix::{ffi::OsStrExt, fs::PermissionsExt}, > + path::{Path, PathBuf}, > + sync::Arc, > +}; > + > +use proxmox_rrd_migration_tool::{rrd_clear_error, rrd_create_r2, > rrd_get_context, rrd_get_error}; > + > +use crate::parallel_handler::ParallelHandler; > + > +pub mod parallel_handler; > + > +const BASE_DIR: &str = "/var/lib/rrdcached/db"; > +const SOURCE_SUBDIR_NODE: &str = "pve2-node"; > +const SOURCE_SUBDIR_GUEST: &str = "pve2-vm"; > +const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage"; > +const TARGET_SUBDIR_NODE: &str = "pve-node-9.0"; > +const TARGET_SUBDIR_GUEST: &str = "pve-vm-9.0"; > +const TARGET_SUBDIR_STORAGE: &str = "pve-storage-9.0"; > +const RESOURCE_BASE_DIR: &str = "/etc/pve"; > +const MAX_THREADS: usize = 4; > +const RRD_STEP_SIZE: usize = 60; > + > +type File = (CString, OsString); Maybe use some different name here in order to avoid confusion with std::fs::File? e.g. RRDFile > + > +// RRAs are defined in the following way: > +// > +// RRA:CF:xff:step:rows > +// CF: AVERAGE or MAX > +// xff: 0.5 > +// steps: stepsize is defined on rrd file creation! example: with 60 seconds > step size: > +// e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min > +// rows: how many aggregated rows are kept, as in how far back in time we > store data > +// > +// how many seconds are aggregated per RRA: steps * stepsize * rows > +// how many hours are aggregated per RRA: steps * stepsize * rows / 3600 > +// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24 > +// > https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example > + > +const RRD_VM_DEF: [&CStr; 25] = [ > + c"DS:maxcpu:GAUGE:120:0:U", > + c"DS:cpu:GAUGE:120:0:U", > + c"DS:maxmem:GAUGE:120:0:U", > + c"DS:mem:GAUGE:120:0:U", > + c"DS:maxdisk:GAUGE:120:0:U", > + c"DS:disk:GAUGE:120:0:U", > + c"DS:netin:DERIVE:120:0:U", > + c"DS:netout:DERIVE:120:0:U", > + c"DS:diskread:DERIVE:120:0:U", > + c"DS:diskwrite:DERIVE:120:0:U", > + c"DS:memhost:GAUGE:120:0:U", > + c"DS:pressurecpusome:GAUGE:120:0:U", > + c"DS:pressurecpufull:GAUGE:120:0:U", > + c"DS:pressureiosome:GAUGE:120:0:U", > + c"DS:pressureiofull:GAUGE:120:0:U", > + c"DS:pressurememorysome:GAUGE:120:0:U", > + c"DS:pressurememoryfull:GAUGE:120:0:U", > + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day > + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day > + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year > + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years > + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day > + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day > + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year > + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years > +]; > + > +const RRD_NODE_DEF: [&CStr; 27] = [ > + c"DS:loadavg:GAUGE:120:0:U", > + c"DS:maxcpu:GAUGE:120:0:U", > + c"DS:cpu:GAUGE:120:0:U", > + c"DS:iowait:GAUGE:120:0:U", > + c"DS:memtotal:GAUGE:120:0:U", > + c"DS:memused:GAUGE:120:0:U", > + c"DS:swaptotal:GAUGE:120:0:U", > + c"DS:swapused:GAUGE:120:0:U", > + c"DS:roottotal:GAUGE:120:0:U", > + c"DS:rootused:GAUGE:120:0:U", > + c"DS:netin:DERIVE:120:0:U", > + c"DS:netout:DERIVE:120:0:U", > + c"DS:memfree:GAUGE:120:0:U", > + c"DS:arcsize:GAUGE:120:0:U", > + c"DS:pressurecpusome:GAUGE:120:0:U", > + c"DS:pressureiosome:GAUGE:120:0:U", > + c"DS:pressureiofull:GAUGE:120:0:U", > + c"DS:pressurememorysome:GAUGE:120:0:U", > + c"DS:pressurememoryfull:GAUGE:120:0:U", > + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day > + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day > + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year > + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years > + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day > + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day > + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year > + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years > +]; > + > +const RRD_STORAGE_DEF: [&CStr; 10] = [ > + c"DS:total:GAUGE:120:0:U", > + c"DS:used:GAUGE:120:0:U", > + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day > + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day > + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year > + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years > + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day > + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day > + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year > + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years > +]; > + > +const HELP: &str = "\ > +proxmox-rrd-migration tool > + > +Migrates existing RRD graph data to the new format. > + > +Use this only in the process of upgrading from Proxmox VE 8 to 9 according > to the upgrade guide! > + > +USAGE: > + proxmox-rrd-migration [OPTIONS] > + > + FLAGS: > + -h, --help Prints this help information > + > + OPTIONS: > + --migrate Start the migration. Without it, only a dry > run will be done. > + > + --force Migrate, even if the target already exists. > + This will overwrite any migrated RRD files! > + > + --threads THREADS Number of paralell threads. > + > + --source <SOURCE DIR> Source base directory. Mainly for tests! > + Default: /var/lib/rrdcached/db > + > + --target <TARGET DIR> Target base directory. Mainly for tests! > + Default: /var/lib/rrdcached/db > + > + --resources <DIR> Directory that contains .vmlist and .member > files. Mainly for tests! > + Default: /etc/pve > + > +"; > + > +#[derive(Debug)] > +struct Args { > + migrate: bool, > + force: bool, > + threads: Option<usize>, > + source: Option<String>, > + target: Option<String>, > + resources: Option<String>, > +} > + > +fn parse_args() -> Result<Args, Error> { > + let mut pargs = pico_args::Arguments::from_env(); > + > + // Help has a higher priority and should be handled separately. > + if pargs.contains(["-h", "--help"]) { > + print!("{}", HELP); > + std::process::exit(0); > + } > + > + let mut args = Args { > + migrate: false, > + threads: pargs > + .opt_value_from_str("--threads") > + .expect("Could not parse --threads parameter"), > + force: false, > + source: pargs > + .opt_value_from_str("--source") > + .expect("Could not parse --source parameter"), > + target: pargs > + .opt_value_from_str("--target") > + .expect("Could not parse --target parameter"), > + resources: pargs > + .opt_value_from_str("--resources") > + .expect("Could not parse --resources parameter"), > + }; > + > + if pargs.contains("--migrate") { > + args.migrate = true; > + } > + if pargs.contains("--force") { > + args.force = true; > + } > + > + // It's up to the caller what to do with the remaining arguments. > + let remaining = pargs.finish(); > + if !remaining.is_empty() { > + bail!(format!("Warning: unused arguments left: {:?}", remaining)); No need to use format! here, bail! supports formatting natively: bail!("Warning: .... {remaining:?}"); > + } > + > + Ok(args) > +} > + > +fn main() { > + let args = match parse_args() { > + Ok(v) => v, > + Err(e) => { > + eprintln!("Error: {}.", e); > + std::process::exit(1); > + } > + }; > + > + let source_base_dir = match args.source { > + Some(ref v) => v.as_str(), > + None => BASE_DIR, > + }; you can use this instead, it's shorter and a bit nicer to read IMO: let source_base_dir = args.source.as_deref().unwrap_or(BASE_DIR); > + > + let target_base_dir = match args.target { > + Some(ref v) => v.as_str(), > + None => BASE_DIR, > + }; > + > + let resource_base_dir = match args.resources { > + Some(ref v) => v.as_str(), > + None => RESOURCE_BASE_DIR, > + }; same for the previous two > + > + let source_dir_guests: PathBuf = [source_base_dir, > SOURCE_SUBDIR_GUEST].iter().collect(); > + let target_dir_guests: PathBuf = [target_base_dir, > TARGET_SUBDIR_GUEST].iter().collect(); > + let source_dir_nodes: PathBuf = [source_base_dir, > SOURCE_SUBDIR_NODE].iter().collect(); > + let target_dir_nodes: PathBuf = [target_base_dir, > TARGET_SUBDIR_NODE].iter().collect(); > + let source_dir_storage: PathBuf = [source_base_dir, > SOURCE_SUBDIR_STORAGE].iter().collect(); > + let target_dir_storage: PathBuf = [target_base_dir, > TARGET_SUBDIR_STORAGE].iter().collect(); What do you think about: let source_base_dir = Path::new(args.source.as_deref().unwrap_or(BASE_DIR)); let target_base_dir = Path::new(args.target.as_deref().unwrap_or(BASE_DIR)); let source_dir_guests = source_base_dir.join(SOURCE_SUBDIR_GUEST); let target_dir_guests = source_base_dir.join(SOURCE_SUBDIR_GUEST); > + > + if !args.migrate { > + println!("DRYRUN! Use the --migrate parameter to start the > migration."); > + } > + if args.force { > + println!("Force mode! Will overwrite existing target RRD files!"); > + } > + > + if let Err(e) = migrate_nodes( > + source_dir_nodes, > + target_dir_nodes, > + resource_base_dir, > + args.migrate, > + args.force, > + ) { > + eprintln!("Error migrating nodes: {}", e); > + std::process::exit(1); > + } > + if let Err(e) = migrate_storage( > + source_dir_storage, > + target_dir_storage, > + args.migrate, > + args.force, > + ) { > + eprintln!("Error migrating storage: {}", e); > + std::process::exit(1); > + } > + if let Err(e) = migrate_guests( > + source_dir_guests, > + target_dir_guests, > + resource_base_dir, > + set_threads(&args), > + args.migrate, > + args.force, > + ) { > + eprintln!("Error migrating guests: {}", e); > + std::process::exit(1); > + } Error handling in this function could be a bit cleaner if broken out into a separate function and by using anyhow's .context/.with_context: fn do_main() -> Result<(), Error> { let args = parse_args(...).context("Could not parse args")?; ... migrate_guests(...).context("Error migrating guests")?; Ok(()) } fn main() { if let Err(e) = do_main() { eprintln!("{e}"); std::process:exit(1); } } What do you think? > +} > + > +/// Set number of threads > +/// > +/// Either a fixed parameter or determining a range between 1 to 4 threads > +/// based on the number of CPU cores available in the system. > +fn set_threads(args: &Args) -> usize { I think the name 'set_threads' is rather confusing for something that *returns* the number of threads to use. Maybe call it 'threads_from_core_count' or something alike? (under the assumption that you remove the let Some(...) as suggested below. If you keep it there, 'get_threads' might be an ok choice. > + if let Some(threads) = args.threads { > + return threads; > + } ^ Personally I'd keep this part outside of the helper, but no hard feelings. fn do_main() { ... let threads = args.threads.unwrap_or_else(threads_from_core_count); migrate_guests(..., threads)?; Ok(()) } fn threads_from_core_count() -> usize { ... } > + > + // check for a way to get physical cores and not threads? > + let cpus: usize = String::from_utf8_lossy( > + std::process::Command::new("nproc") > + .output() > + .expect("Error running nproc") > + .stdout > + .as_slice() > + .trim_ascii(), > + ) > + .parse::<usize>() > + .expect("Could not parse nproc output"); > + > + if cpus < 32 { > + let threads = cpus / 8; > + if threads == 0 { > + return 1; > + } > + return threads; > + } > + MAX_THREADS > +} > + > +/// Check if a VMID is currently configured > +fn resource_present(path: &str, resource: &str) -> Result<bool> { > + let resourcelist = fs::read_to_string(path)?; > + Ok(resourcelist.contains(format!("\"{resource}\"").as_str())) > +} > + > +/// Rename file to old, when migrated or resource not present at all -> old > RRD file > +fn mv_old(file: &str) -> Result<()> { > + let old = format!("{}.old", file); > + fs::rename(file, old)?; > + Ok(()) > +} > + > +/// Colllect all RRD files in the provided directory > +fn collect_rrd_files(location: &PathBuf) -> Result<Vec<(CString, OsString)>> > { ^ Maybe use the type you've defined here? `File`, although I'd prefer a different name to avoid confusion with std::fs::File. > + let mut files: Vec<(CString, OsString)> = Vec::new(); > + > + fs::read_dir(location)? > + .filter(|f| f.is_ok()) > + .map(|f| f.unwrap().path()) You can use filter_map here, maybe like this: fs::read_dir(location)? .filter_map(|f| match f { Ok(a) => Some(a.path()), Err(e) => { eprintln!("could not read dir entry: {e}"); None } }) or, if you don't want to log the error: fs::read_dir(location)? .filter_map(Result::ok) .map(|entry| entry.path()) (untested, but you get the idea) > + .filter(|f| f.is_file() && f.extension().is_none()) > + .for_each(|file| { > + let path = CString::new(file.as_path().as_os_str().as_bytes()) > + .expect("Could not convert path to CString."); ^ > + let fname = file > + .file_name() > + .map(|v| v.to_os_string()) Reading the docs for the CString::new function, it should only fail if there is a NUL byte in the string, which should AFAIK be impossible here since the string came from the file name. Maybe express that in some comment here? v > + .expect("Could not convert fname to OsString."); > + files.push((path, fname)) > + }); > + Ok(files) > +} > + > +/// Does the actual migration for the given file > +fn do_rrd_migration( > + file: File, > + target_location: &Path, > + rrd_def: &[&CStr], > + migrate: bool, > + force: bool, > +) -> Result<()> { > + if !migrate { > + println!("would migrate but in dry run mode"); > + } > + > + let resource = file.1; > + let mut target_path = target_location.to_path_buf(); Since the first thing you do with target_location is to convert it to a PathBuf, I'd suggest just passing it as a PathBuf and let the caller take care of the allocation. > + target_path.push(resource); > + > + if target_path.exists() && !force { > + println!( > + "already migrated, use --force to overwrite target file: {}", > + target_path.display() > + ); > + } > + > + if !migrate || (target_path.exists() && !force) { > + bail!("skipping"); > + } you could pull out the 'target_path.exists() && !force' into a variable so that you don't have to evaluate the same thing twice > + > + let mut source: [*const i8; 2] = [std::ptr::null(); 2]; > + source[0] = file.0.as_ptr(); > + > + let target_path = CString::new(target_path.to_str().unwrap()).unwrap(); > + > + unsafe { > + rrd_get_context(); > + rrd_clear_error(); > + let res = rrd_create_r2( > + target_path.as_ptr(), > + RRD_STEP_SIZE as u64, > + 0, > + 0, > + source.as_mut_ptr(), > + std::ptr::null(), > + rrd_def.len() as i32, > + rrd_def > + .iter() > + .map(|v| v.as_ptr()) > + .collect::<Vec<_>>() > + .as_mut_ptr(), > + ); > + if res != 0 { > + bail!( > + "RRD create Error: {}", > + CStr::from_ptr(rrd_get_error()).to_string_lossy() > + ); > + } > + } > + Ok(()) > +} > + > +/// Migrate guest RRD files > +/// > +/// In parallel to speed up the process as most time is spent on converting > the > +/// data to the new format. > +fn migrate_guests( > + source_dir_guests: PathBuf, > + target_dir_guests: PathBuf, > + resources: &str, > + threads: usize, > + migrate: bool, > + force: bool, > +) -> Result<(), Error> { > + println!("Migrating RRD data for guests…"); > + println!("Using {} thread(s)", threads); > + > + let guest_source_files = collect_rrd_files(&source_dir_guests)?; > + > + if !target_dir_guests.exists() && migrate { > + println!("Creating new directory: '{}'", > target_dir_guests.display()); > + std::fs::create_dir(&target_dir_guests)?; > + } > + > + let total_guests = guest_source_files.len(); > + let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0)); > + let guests2 = guests.clone(); Just FIY, when cloning an Arc it's better to use Arc::clone(&guests), because this makes it clearer *what* you are a actually cloning, the Arc vs. the content of the Arc > + let start_time = std::time::SystemTime::now(); Since you only measure the elapsed time it might be more idiomatic to use std::time::Instant here, but not hard feelings. > + > + let migration_pool = ParallelHandler::new( > + "guest rrd migration", > + threads, > + move |file: (CString, OsString)| { Please add some comment regarding the .unwrap here. > + let full_path = file.0.clone().into_string().unwrap(); > + > + if let Ok(()) = do_rrd_migration( > + file, > + &target_dir_guests, > + RRD_VM_DEF.as_slice(), > + migrate, > + force, > + ) { Since do_rrd_migration does not return any data in the Option, you could just if do_rrd_migration(....).is_ok() { .... } > + mv_old(full_path.as_str())?; > + let current_guests = guests2.fetch_add(1, > std::sync::atomic::Ordering::SeqCst); > + if current_guests > 0 && current_guests % 200 == 0 { > + println!("Migrated {} of {} guests", current_guests, > total_guests); > + } > + } > + Ok(()) > + }, > + ); > + let migration_channel = migration_pool.channel(); > + > + for file in guest_source_files { > + let node = file.1.clone().into_string().unwrap(); > + if !resource_present(format!("{resources}/.vmlist").as_str(), > node.as_str())? { > + println!("VMID: '{node}' not present. Skip and mark as old."); > + mv_old(format!("{}", file.0.to_string_lossy()).as_str())?; > + } > + let migration_channel = migration_channel.clone(); Is this clone here needed? Seems to compile fine without here.... > + migration_channel.send(file)?; > + } > + > + drop(migration_channel); > + migration_pool.complete()?; > + > + let elapsed = start_time.elapsed()?.as_secs_f64(); > + let guests = guests.load(std::sync::atomic::Ordering::SeqCst); > + println!("Migrated {} guests", guests); > + println!("It took {:.2}s", elapsed); > + > + Ok(()) > +} > + > +/// Migrate node RRD files > +/// > +/// In serial as the number of nodes will not be high. > +fn migrate_nodes( > + source_dir_nodes: PathBuf, > + target_dir_nodes: PathBuf, > + resources: &str, Any reason why this one is a &str instead of a PathBuf? As far as I can tell it is also a path (/etc/pve by default). Also the name of the variable somehow makes it not really clear that this is suppose to be a path, I only deduced it from RESOURCE_BASE_DIR. > + migrate: bool, > + force: bool, > +) -> Result<(), Error> { > + println!("Migrating RRD data for nodes…"); > + > + if !target_dir_nodes.exists() && migrate { > + println!("Creating new directory: '{}'", target_dir_nodes.display()); > + std::fs::create_dir(&target_dir_nodes)?; > + } > + > + let node_source_files = collect_rrd_files(&source_dir_nodes)?; > + > + for file in node_source_files { > + let node = file.1.clone().into_string().unwrap(); > + let full_path = file.0.clone().into_string().unwrap(); Please add some comment why it is okay to .unwrap here (or just return or ignore the error, if that makes more sense). > + println!("Node: '{node}'"); > + if !resource_present(format!("{resources}/.members").as_str(), > node.as_str())? { You can just use &format!... and &node instead of the .as_str() calls (a bit nicer to read and more idionmatic, but no hard feelings). > + println!("Node: '{node}' not present. Skip and mark as old."); > + mv_old(format!("{}/{}", file.0.to_string_lossy(), > node).as_str())?; > + } > + if let Ok(()) = do_rrd_migration( > + file, > + &target_dir_nodes, > + RRD_NODE_DEF.as_slice(), > + migrate, > + force, > + ) { Since do_rrd_migration does not return any data in the Option, you could just if do_rrd_migration(....).is_ok() { .... } > + mv_old(full_path.as_str())?; > + } > + } > + println!("Migrated all nodes"); > + > + Ok(()) > +} > + > +/// Migrate storage RRD files > +/// > +/// In serial as the number of storage will not be that high. > +fn migrate_storage( > + source_dir_storage: PathBuf, > + target_dir_storage: PathBuf, > + migrate: bool, > + force: bool, > +) -> Result<(), Error> { > + println!("Migrating RRD data for storages…"); > + > + if !target_dir_storage.exists() && migrate { > + println!("Creating new directory: '{}'", > target_dir_storage.display()); > + std::fs::create_dir(&target_dir_storage)?; > + } > + > + // storage has another layer of directories per node over which we need > to iterate > + fs::read_dir(&source_dir_storage)? > + .filter(|f| f.is_ok()) > + .map(|f| f.unwrap().path()) > + .filter(|f| f.is_dir()) you can use filter_map here, as explained in collect_rrd_files > + .try_for_each(|node| { > + let mut source_storage_subdir = source_dir_storage.clone(); > + source_storage_subdir.push(node.file_name().unwrap()); > + > + let mut target_storage_subdir = target_dir_storage.clone(); > + target_storage_subdir.push(node.file_name().unwrap()); > + > + if !target_storage_subdir.exists() && migrate { > + fs::create_dir(target_storage_subdir.as_path())?; You can use & here instead of .as_path() :) > + let metadata = target_storage_subdir.metadata()?; > + let mut permissions = metadata.permissions(); > + permissions.set_mode(0o755); You need to actually apply the permissions to the dir, here you only set the permission bits in the Permissions data type. std::fs::set_permissions(...) > + } > + > + let storage_source_files = > collect_rrd_files(&source_storage_subdir)?; > + > + for file in storage_source_files { > + println!( > + "Storage: '{}/{}'", > + node.file_name() > + .expect("no file name present") Same thing here regarding the potential panic > + .to_string_lossy(), > + PathBuf::from(file.1.clone()).display() Starting with rustc 1.87, you can directly call file.1.display() on the underlying OsStr(ing). > + ); > + > + let full_path = file.0.clone().into_string().unwrap(); > + if let Ok(()) = do_rrd_migration( > + file, > + &target_storage_subdir, > + RRD_STORAGE_DEF.as_slice(), > + migrate, > + force, > + ) { > + mv_old(full_path.as_str())?; > + } Since do_rrd_migration does not return any data in the Option, you could just if do_rrd_migration(....).is_ok() { .... } > + } > + Ok::<(), Error>(()) > + })?; > + println!("Migrated all storages"); > + > + Ok(()) > +} > diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs > new file mode 100644 > index 0000000..d8ee3c7 > --- /dev/null > +++ b/src/parallel_handler.rs > @@ -0,0 +1,160 @@ > +//! A thread pool which run a closure in parallel. > + > +use std::sync::{Arc, Mutex}; > +use std::thread::JoinHandle; > + > +use anyhow::{bail, format_err, Error}; > +use crossbeam_channel::{bounded, Sender}; > + > +/// A handle to send data to the worker thread (implements clone) > +pub struct SendHandle<I> { > + input: Sender<I>, > + abort: Arc<Mutex<Option<String>>>, > +} > + > +/// Returns the first error happened, if any > +pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> { > + let guard = abort.lock().unwrap(); > + if let Some(err_msg) = &*guard { > + return Err(format_err!("{}", err_msg)); > + } > + Ok(()) > +} > + > +impl<I: Send> SendHandle<I> { > + /// Send data to the worker threads > + pub fn send(&self, input: I) -> Result<(), Error> { > + check_abort(&self.abort)?; > + match self.input.send(input) { > + Ok(()) => Ok(()), > + Err(_) => bail!("send failed - channel closed"), > + } might be more idiomatic to use .map_err here > + } > +} > + > +/// A thread pool which run the supplied closure > +/// > +/// The send command sends data to the worker threads. If one handler > +/// returns an error, we mark the channel as failed and it is no > +/// longer possible to send data. > +/// > +/// When done, the 'complete()' method needs to be called to check for > +/// outstanding errors. > +pub struct ParallelHandler<I> { > + handles: Vec<JoinHandle<()>>, > + name: String, > + input: Option<SendHandle<I>>, > +} > + > +impl<I> Clone for SendHandle<I> { > + fn clone(&self) -> Self { > + Self { > + input: self.input.clone(), > + abort: Arc::clone(&self.abort), > + } > + } > +} > + > +impl<I: Send + 'static> ParallelHandler<I> { > + /// Create a new thread pool, each thread processing incoming data > + /// with 'handler_fn'. > + pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self > + where > + F: Fn(I) -> Result<(), Error> + Send + Clone + 'static, > + { > + let mut handles = Vec::new(); > + let (input_tx, input_rx) = bounded::<I>(threads); > + > + let abort = Arc::new(Mutex::new(None)); > + > + for i in 0..threads { > + let input_rx = input_rx.clone(); > + let abort = Arc::clone(&abort); > + let handler_fn = handler_fn.clone(); > + > + handles.push( > + std::thread::Builder::new() > + .name(format!("{} ({})", name, i)) > + .spawn(move || loop { > + let data = match input_rx.recv() { > + Ok(data) => data, > + Err(_) => return, > + }; > + if let Err(err) = (handler_fn)(data) { > + let mut guard = abort.lock().unwrap(); .unwrap on Mutex::lock is fine IMO, but should have a comment explaining that it only .unwrap's on a poisioned mutex. > + if guard.is_none() { > + *guard = Some(err.to_string()); > + } > + } > + }) > + .unwrap(), This shouldn't .unwrap() IMO, rather return an error from this function. > + ); > + } > + Self { > + handles, > + name: name.to_string(), > + input: Some(SendHandle { > + input: input_tx, > + abort, > + }), > + } > + } > + > + /// Returns a cloneable channel to send data to the worker threads > + pub fn channel(&self) -> SendHandle<I> { > + self.input.as_ref().unwrap().clone() Please add a comment why .unwrap is okay here or bubble some error up > + } > + > + /// Send data to the worker threads > + pub fn send(&self, input: I) -> Result<(), Error> { > + self.input.as_ref().unwrap().send(input)?; Please add a comment why .unwrap is okay here or bubble some error up > + Ok(()) > + } > + > + /// Wait for worker threads to complete and check for errors > + pub fn complete(mut self) -> Result<(), Error> { > + let input = self.input.take().unwrap(); > + let abort = Arc::clone(&input.abort); > + check_abort(&abort)?; > + drop(input); > + > + let msg_list = self.join_threads(); > + > + // an error might be encountered while waiting for the join > + check_abort(&abort)?; > + > + if msg_list.is_empty() { > + return Ok(()); > + } > + Err(format_err!("{}", msg_list.join("\n"))) I'd rather if !msg_list.is_empty() { bail!("{}", msg_list.join('\n')); } Ok(()) > + } > + > + fn join_threads(&mut self) -> Vec<String> { > + let mut msg_list = Vec::new(); > + > + let mut i = 0; > + while let Some(handle) = self.handles.pop() { > + if let Err(panic) = handle.join() { > + if let Some(panic_msg) = panic.downcast_ref::<&str>() { > + msg_list.push(format!("thread {} ({i}) panicked: > {panic_msg}", self.name)); > + } else if let Some(panic_msg) = > panic.downcast_ref::<String>() { > + msg_list.push(format!("thread {} ({i}) panicked: > {panic_msg}", self.name)); > + } else { > + msg_list.push(format!("thread {} ({i}) panicked", > self.name)); > + } > + } > + i += 1; > + } > + msg_list > + } > +} > + > +// Note: We make sure that all threads will be joined > +impl<I> Drop for ParallelHandler<I> { > + fn drop(&mut self) { > + drop(self.input.take()); > + while let Some(handle) = self.handles.pop() { > + let _ = handle.join(); > + } > + } > +} > diff --git a/wrapper.h b/wrapper.h > new file mode 100644 > index 0000000..64d0aa6 > --- /dev/null > +++ b/wrapper.h > @@ -0,0 +1 @@ > +#include <rrd.h> _______________________________________________ pve-devel mailing list pve-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel