In the current implementation, the callee of
{score,select}_best_balancing_migration provides the migration
candidates, which are unpacked with generate_migration_candidates_from()Signed-off-by: Daniel Kral <[email protected]> --- .../bindings/resource_scheduling_static.rs | 150 +++++++++++++++++- 1 file changed, 148 insertions(+), 2 deletions(-) diff --git a/pve-rs/src/bindings/resource_scheduling_static.rs b/pve-rs/src/bindings/resource_scheduling_static.rs index 3764aaa..84a5497 100644 --- a/pve-rs/src/bindings/resource_scheduling_static.rs +++ b/pve-rs/src/bindings/resource_scheduling_static.rs @@ -9,11 +9,15 @@ pub mod pve_rs_resource_scheduling_static { use std::collections::{HashMap, HashSet}; use std::sync::Mutex; - use anyhow::{Error, bail}; + use serde::{Deserialize, Serialize}; + + use anyhow::{Context, Error, bail}; use perlmod::Value; use proxmox_resource_scheduling::pve_static::{StaticNodeUsage, StaticServiceUsage}; - use proxmox_resource_scheduling::scheduler::ClusterUsage; + use proxmox_resource_scheduling::scheduler::{ + ClusterUsage, MigrationCandidate, ScoredMigration, + }; perlmod::declare_magic!(Box<Scheduler> : &Scheduler as "PVE::RS::ResourceScheduling::Static"); @@ -208,6 +212,148 @@ pub mod pve_rs_resource_scheduling_static { ClusterUsage::from_nodes(nodes) } + /// Method: Calculates the loads for each node. + #[export] + pub fn calculate_node_loads(#[try_from_ref] this: &Scheduler) -> Vec<(String, f64)> { + let usage = this.inner.lock().unwrap(); + let cluster_usage = as_cluster_usage(&usage); + + cluster_usage.node_loads() + } + + /// Method: Calculates the imbalance among the nodes. + #[export] + pub fn calculate_node_imbalance(#[try_from_ref] this: &Scheduler) -> f64 { + let usage = this.inner.lock().unwrap(); + let cluster_usage = as_cluster_usage(&usage); + + cluster_usage.node_imbalance() + } + + /// A compact representation of MigationCandidate. + #[derive(Serialize, Deserialize)] + pub struct CompactMigrationCandidate { + /// The identifier of the leading service. + pub leader: String, + /// The services which are part of the leading service's bundle. + pub services: Vec<String>, + /// The nodes, which are possible to migrate to for the services. + pub nodes: Vec<String>, + } + + fn generate_migration_candidates_from( + usage: &Usage, + candidates: Vec<CompactMigrationCandidate>, + ) -> Result<Vec<MigrationCandidate>, Error> { + let mut migration_candidates = Vec::new(); + + for candidate in candidates.into_iter() { + let leader_sid = candidate.leader; + let leader = usage.services.get(&leader_sid).with_context(|| { + format!( + "leader {} is not present in services usage hashmap", + leader_sid + ) + })?; + let source_node = leader.nodes.iter().next().unwrap(); + + let mut service_candidates = Vec::new(); + + for sid in candidate.services.iter() { + let service = usage + .services + .get(sid) + .with_context(|| format!("service {} is not present in usage hashmap", sid))?; + let service_nodes = &service.nodes; + + if service_nodes.len() > 1 { + bail!("service {sid} is on multiple nodes"); + } + + if !service_nodes.contains(source_node) { + bail!("service {sid} is not on common source node {source_node}"); + } + + service_candidates.push(service); + } + + let bundle_stats = service_candidates + .into_iter() + .fold(StaticServiceUsage::default(), |total_stats, service| { + total_stats + service.stats + }); + + for target_node in candidate.nodes.into_iter() { + migration_candidates.push(MigrationCandidate { + sid: leader_sid.to_string(), + source_node: source_node.to_string(), + target_node, + stats: bundle_stats.into(), + }); + } + } + + Ok(migration_candidates) + } + + /// Method: Score the service motions by the best node imbalance improvement with exhaustive search. + #[export] + pub fn score_best_balancing_migrations( + #[try_from_ref] this: &Scheduler, + candidates: Vec<CompactMigrationCandidate>, + limit: usize, + ) -> Result<Vec<ScoredMigration>, Error> { + let usage = this.inner.lock().unwrap(); + + let cluster_usage = as_cluster_usage(&usage); + let candidates = generate_migration_candidates_from(&usage, candidates)?; + + cluster_usage.score_best_balancing_migrations(candidates, limit) + } + + /// Method: Select the service motion with the best node imbalance improvement with exhaustive search. + #[export] + pub fn select_best_balancing_migration( + #[try_from_ref] this: &Scheduler, + candidates: Vec<CompactMigrationCandidate>, + ) -> Result<Option<ScoredMigration>, Error> { + let usage = this.inner.lock().unwrap(); + + let cluster_usage = as_cluster_usage(&usage); + let candidates = generate_migration_candidates_from(&usage, candidates)?; + + cluster_usage.select_best_balancing_migration(candidates) + } + + /// Method: Score the service motions by the best node imbalance improvement with the TOPSIS method. + #[export] + pub fn score_best_balancing_migrations_topsis( + #[try_from_ref] this: &Scheduler, + candidates: Vec<CompactMigrationCandidate>, + limit: usize, + ) -> Result<Vec<ScoredMigration>, Error> { + let usage = this.inner.lock().unwrap(); + + let cluster_usage = as_cluster_usage(&usage); + let candidates = generate_migration_candidates_from(&usage, candidates)?; + + cluster_usage.score_best_balancing_migrations_topsis(&candidates, limit) + } + + /// Method: Select the service motion with the best node imbalance improvement with the TOPSIS method. + #[export] + pub fn select_best_balancing_migration_topsis( + #[try_from_ref] this: &Scheduler, + candidates: Vec<CompactMigrationCandidate>, + ) -> Result<Option<ScoredMigration>, Error> { + let usage = this.inner.lock().unwrap(); + + let cluster_usage = as_cluster_usage(&usage); + let candidates = generate_migration_candidates_from(&usage, candidates)?; + + cluster_usage.select_best_balancing_migration_topsis(&candidates) + } + /// Scores all previously added nodes for starting a `service` on. /// /// Scoring is done according to the static memory and CPU usages of the nodes as if the -- 2.47.3
