In the current implementation, the callee of
{score,select}_best_balancing_migration provides the migration
candidates, which are unpacked with generate_migration_candidates_from()

Signed-off-by: Daniel Kral <[email protected]>
---
 .../bindings/resource_scheduling_static.rs    | 150 +++++++++++++++++-
 1 file changed, 148 insertions(+), 2 deletions(-)

diff --git a/pve-rs/src/bindings/resource_scheduling_static.rs 
b/pve-rs/src/bindings/resource_scheduling_static.rs
index 3764aaa..84a5497 100644
--- a/pve-rs/src/bindings/resource_scheduling_static.rs
+++ b/pve-rs/src/bindings/resource_scheduling_static.rs
@@ -9,11 +9,15 @@ pub mod pve_rs_resource_scheduling_static {
     use std::collections::{HashMap, HashSet};
     use std::sync::Mutex;
 
-    use anyhow::{Error, bail};
+    use serde::{Deserialize, Serialize};
+
+    use anyhow::{Context, Error, bail};
 
     use perlmod::Value;
     use proxmox_resource_scheduling::pve_static::{StaticNodeUsage, 
StaticServiceUsage};
-    use proxmox_resource_scheduling::scheduler::ClusterUsage;
+    use proxmox_resource_scheduling::scheduler::{
+        ClusterUsage, MigrationCandidate, ScoredMigration,
+    };
 
     perlmod::declare_magic!(Box<Scheduler> : &Scheduler as 
"PVE::RS::ResourceScheduling::Static");
 
@@ -208,6 +212,148 @@ pub mod pve_rs_resource_scheduling_static {
         ClusterUsage::from_nodes(nodes)
     }
 
+    /// Method: Calculates the loads for each node.
+    #[export]
+    pub fn calculate_node_loads(#[try_from_ref] this: &Scheduler) -> 
Vec<(String, f64)> {
+        let usage = this.inner.lock().unwrap();
+        let cluster_usage = as_cluster_usage(&usage);
+
+        cluster_usage.node_loads()
+    }
+
+    /// Method: Calculates the imbalance among the nodes.
+    #[export]
+    pub fn calculate_node_imbalance(#[try_from_ref] this: &Scheduler) -> f64 {
+        let usage = this.inner.lock().unwrap();
+        let cluster_usage = as_cluster_usage(&usage);
+
+        cluster_usage.node_imbalance()
+    }
+
+    /// A compact representation of MigationCandidate.
+    #[derive(Serialize, Deserialize)]
+    pub struct CompactMigrationCandidate {
+        /// The identifier of the leading service.
+        pub leader: String,
+        /// The services which are part of the leading service's bundle.
+        pub services: Vec<String>,
+        /// The nodes, which are possible to migrate to for the services.
+        pub nodes: Vec<String>,
+    }
+
+    fn generate_migration_candidates_from(
+        usage: &Usage,
+        candidates: Vec<CompactMigrationCandidate>,
+    ) -> Result<Vec<MigrationCandidate>, Error> {
+        let mut migration_candidates = Vec::new();
+
+        for candidate in candidates.into_iter() {
+            let leader_sid = candidate.leader;
+            let leader = usage.services.get(&leader_sid).with_context(|| {
+                format!(
+                    "leader {} is not present in services usage hashmap",
+                    leader_sid
+                )
+            })?;
+            let source_node = leader.nodes.iter().next().unwrap();
+
+            let mut service_candidates = Vec::new();
+
+            for sid in candidate.services.iter() {
+                let service = usage
+                    .services
+                    .get(sid)
+                    .with_context(|| format!("service {} is not present in 
usage hashmap", sid))?;
+                let service_nodes = &service.nodes;
+
+                if service_nodes.len() > 1 {
+                    bail!("service {sid} is on multiple nodes");
+                }
+
+                if !service_nodes.contains(source_node) {
+                    bail!("service {sid} is not on common source node 
{source_node}");
+                }
+
+                service_candidates.push(service);
+            }
+
+            let bundle_stats = service_candidates
+                .into_iter()
+                .fold(StaticServiceUsage::default(), |total_stats, service| {
+                    total_stats + service.stats
+                });
+
+            for target_node in candidate.nodes.into_iter() {
+                migration_candidates.push(MigrationCandidate {
+                    sid: leader_sid.to_string(),
+                    source_node: source_node.to_string(),
+                    target_node,
+                    stats: bundle_stats.into(),
+                });
+            }
+        }
+
+        Ok(migration_candidates)
+    }
+
+    /// Method: Score the service motions by the best node imbalance 
improvement with exhaustive search.
+    #[export]
+    pub fn score_best_balancing_migrations(
+        #[try_from_ref] this: &Scheduler,
+        candidates: Vec<CompactMigrationCandidate>,
+        limit: usize,
+    ) -> Result<Vec<ScoredMigration>, Error> {
+        let usage = this.inner.lock().unwrap();
+
+        let cluster_usage = as_cluster_usage(&usage);
+        let candidates = generate_migration_candidates_from(&usage, 
candidates)?;
+
+        cluster_usage.score_best_balancing_migrations(candidates, limit)
+    }
+
+    /// Method: Select the service motion with the best node imbalance 
improvement with exhaustive search.
+    #[export]
+    pub fn select_best_balancing_migration(
+        #[try_from_ref] this: &Scheduler,
+        candidates: Vec<CompactMigrationCandidate>,
+    ) -> Result<Option<ScoredMigration>, Error> {
+        let usage = this.inner.lock().unwrap();
+
+        let cluster_usage = as_cluster_usage(&usage);
+        let candidates = generate_migration_candidates_from(&usage, 
candidates)?;
+
+        cluster_usage.select_best_balancing_migration(candidates)
+    }
+
+    /// Method: Score the service motions by the best node imbalance 
improvement with the TOPSIS method.
+    #[export]
+    pub fn score_best_balancing_migrations_topsis(
+        #[try_from_ref] this: &Scheduler,
+        candidates: Vec<CompactMigrationCandidate>,
+        limit: usize,
+    ) -> Result<Vec<ScoredMigration>, Error> {
+        let usage = this.inner.lock().unwrap();
+
+        let cluster_usage = as_cluster_usage(&usage);
+        let candidates = generate_migration_candidates_from(&usage, 
candidates)?;
+
+        cluster_usage.score_best_balancing_migrations_topsis(&candidates, 
limit)
+    }
+
+    /// Method: Select the service motion with the best node imbalance 
improvement with the TOPSIS method.
+    #[export]
+    pub fn select_best_balancing_migration_topsis(
+        #[try_from_ref] this: &Scheduler,
+        candidates: Vec<CompactMigrationCandidate>,
+    ) -> Result<Option<ScoredMigration>, Error> {
+        let usage = this.inner.lock().unwrap();
+
+        let cluster_usage = as_cluster_usage(&usage);
+        let candidates = generate_migration_candidates_from(&usage, 
candidates)?;
+
+        cluster_usage.select_best_balancing_migration_topsis(&candidates)
+    }
+
     /// Scores all previously added nodes for starting a `service` on.
     ///
     /// Scoring is done according to the static memory and CPU usages of the 
nodes as if the
-- 
2.47.3




Reply via email to