maddiedawson commented on code in PR #41778:
URL: https://github.com/apache/spark/pull/41778#discussion_r1253704912


##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that 
deepspeed is able to use ssh to coordinate among the nodes for the distributed 
training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the 
GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # 
pyspark doesn't support this out of the box for some reason, so sadge

Review Comment:
   Let's use pynvm.nvmlDeviceGetCount()



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that 
deepspeed is able to use ssh to coordinate among the nodes for the distributed 
training"""

Review Comment:
   Let's provide guidance here on how to do this



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)

Review Comment:
   This does not need to be a util function. Just inline the implementation 
below



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that 
deepspeed is able to use ssh to coordinate among the nodes for the distributed 
training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the 
GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # 
pyspark doesn't support this out of the box for some reason, so sadge
+        proc_res = subprocess.run(command, capture_output=True, text=True, 
shell=True)
+        if proc_res.returncode:
+            raise RuntimeError(
+                f"something went wrong when running the command {command}. Is 
nvidia-smi installed?"
+            )
+        num_gpus_on_worker = proc_res.stdout
+        return int(num_gpus_on_worker)
+
+    def _assign_procs_to_worker(self, gpu_node_map: Dict[str, int]) -> 
Dict[str, int]:
+        procs_left = self.num_processes
+        workers_left_to_serve = len(gpu_node_map)
+        average_procs_per_node = procs_left // workers_left_to_serve
+        gpu_mapped_to_node = {}
+        # sorting allows us to just do a single pass, as filling the smallest 
capacity nodes first will allow for a single pass
+        sorted_buckets = sorted(gpu_node_map.items(), key=lambda x: x[1])
+        for worker, capacity in sorted_buckets:
+            average_procs_per_node = procs_left // workers_left_to_serve
+            gpu_mapped_to_node[worker] = min(average_procs_per_node, capacity)
+            procs_left -= gpu_mapped_to_node[worker]
+            workers_left_to_serve -= 1
+        if procs_left != 0:
+            self.logger.warning(
+                msg=f"There are not enough GPUS to fully assign processes to 
nodes; there are {procs_left} processes left over"
+            )
+        return gpu_mapped_to_node
+
+    def _setup_hostfile_info(self):
+        worker_hosts = [
+            executor.host()
+            for executor in 
self.spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()
+        ]  # we should check if this returns the driver or not
+        worker_count = len(worker_hosts)  # find out if this number includes 
the driver or not
+        rdd = spark.sparkContext.parallelize(range(worker_count), 
numSlices=worker_count)
+
+        # what do I do if the use_gpu flag is false?
+        slots_on_workers = {}
+        if self.use_gpu:
+            for worker_host in worker_hosts:
+                slots_on_workers[worker_host] = 
self._get_gpus_on_node(worker_host)
+        else:
+            raise RuntimeError("Deepspeed doesn't work with non-GPU clusters 
at this time")
+
+        assigned_slots = self._assign_procs_to_worker(slots_on_workers)
+        self.logger.info(f"Writing to {DeepspeedDistributor.HOSTFILE}")
+        for worker_host in worker_hosts:
+            line = f"{worker_host} slots={assigned_slots[worker_host]}\n"
+            write_to_location(DeepspeedDistributor.HOSTFILE, line)
+        return worker_hosts
+
+    def setup_env(self):
+        try:
+            subprocess.run("deepspeed --version".split())
+            subprocess.run("ninja --version".split())
+            with open(f"/{DeepspeedDistributor.HOME}/.deepspeed_env", "w") as 
f:
+                # if this is open; don't add that to path if they're not 
running on databricks
+                # TODO: figure out what paths to add to this, because if this 
is OSS we don't want to constantly add a databricks filepath
+                f.write(
+                    
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
+                )
+        except:
+            raise ImportError("Install deepspeed and ninja on the cluster 
using PyPi")
+
+    def _create_deepspeed_command(
+        self, input_params: Dict[str, Any], path_to_train_file: str, *args: Any
+    ):
+        local_mode = input_params["local_mode"]
+        num_processes = input_params["num_processes"]
+        deepspeed_config = input_params["deepspeed_config"]
+        if isinstance(deepspeed_config, dict):
+            with tempfile.NamedTemporaryFile(mode="w+", delete=False, 
suffix=".json") as f:
+                json.dump(deepspeed_config, f)
+                deepspeed_config_path = f.name
+                self.temp_deepspeed_fname = f.name
+        else:
+            deepspeed_config_path = deepspeed_config
+        if local_mode:
+            deepspeed_args = [
+                "--num_gpus",
+                str(num_processes),
+            ]  # no need for num nodes, the host file, or any port stuff (no 
communiation)
+        else:
+            deepspeed_args = [
+                "--num_gpus",
+                str(input_params["num_processes"]),
+                "--num_nodes",
+                str(len(self.worker_hosts)),
+                "--hostfile",
+                str(DeepspeedDistributor.HOSTFILE),
+                "--master_addr",
+                str(self.worker_hosts[0]),
+                "--master_port=9902",
+            ]
+        return [
+            "deepspeed",
+            *deepspeed_args,
+            path_to_train_file,
+            *args,
+            "--deepspeed",
+            "--deepspeed_config",
+            deepspeed_config_path,
+        ]
+
+    def _run_training_on_pytorch_file(
+        self, input_params: Dict[str, Any], train_path: str, *args: Any, 
**kwargs: Any
+    ) -> None:
+        if kwargs:
+            raise ValueError("Running pytorch file does not support key-word 
type arguments.")
+        training_command = self._create_deepspeed_command(input_params, 
train_path, *args)
+        TorchDistributor._execute_command(
+            training_command
+        )  # should we include some form of logging here
+
+    def run(self, train_object: Union[Callable, str], *args: Any, **kwargs: 
Any) -> Optional[Any]:
+        if isinstance(train_object, str):
+            self._run_training_on_pytorch_file(self.input_params, 
train_object, *args, **kwargs)  # type: ignore
+        else:
+            raise RuntimeError("Using functions isn't implemented yet. Next 
iteration.")

Review Comment:
   What does "next iteration" mean here?



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that 
deepspeed is able to use ssh to coordinate among the nodes for the distributed 
training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the 
GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # 
pyspark doesn't support this out of the box for some reason, so sadge
+        proc_res = subprocess.run(command, capture_output=True, text=True, 
shell=True)
+        if proc_res.returncode:
+            raise RuntimeError(
+                f"something went wrong when running the command {command}. Is 
nvidia-smi installed?"
+            )
+        num_gpus_on_worker = proc_res.stdout
+        return int(num_gpus_on_worker)
+
+    def _assign_procs_to_worker(self, gpu_node_map: Dict[str, int]) -> 
Dict[str, int]:
+        procs_left = self.num_processes
+        workers_left_to_serve = len(gpu_node_map)
+        average_procs_per_node = procs_left // workers_left_to_serve
+        gpu_mapped_to_node = {}
+        # sorting allows us to just do a single pass, as filling the smallest 
capacity nodes first will allow for a single pass
+        sorted_buckets = sorted(gpu_node_map.items(), key=lambda x: x[1])
+        for worker, capacity in sorted_buckets:
+            average_procs_per_node = procs_left // workers_left_to_serve
+            gpu_mapped_to_node[worker] = min(average_procs_per_node, capacity)
+            procs_left -= gpu_mapped_to_node[worker]
+            workers_left_to_serve -= 1
+        if procs_left != 0:
+            self.logger.warning(
+                msg=f"There are not enough GPUS to fully assign processes to 
nodes; there are {procs_left} processes left over"
+            )
+        return gpu_mapped_to_node
+
+    def _setup_hostfile_info(self):
+        worker_hosts = [
+            executor.host()
+            for executor in 
self.spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()
+        ]  # we should check if this returns the driver or not
+        worker_count = len(worker_hosts)  # find out if this number includes 
the driver or not
+        rdd = spark.sparkContext.parallelize(range(worker_count), 
numSlices=worker_count)
+
+        # what do I do if the use_gpu flag is false?
+        slots_on_workers = {}
+        if self.use_gpu:
+            for worker_host in worker_hosts:
+                slots_on_workers[worker_host] = 
self._get_gpus_on_node(worker_host)
+        else:
+            raise RuntimeError("Deepspeed doesn't work with non-GPU clusters 
at this time")
+
+        assigned_slots = self._assign_procs_to_worker(slots_on_workers)
+        self.logger.info(f"Writing to {DeepspeedDistributor.HOSTFILE}")
+        for worker_host in worker_hosts:
+            line = f"{worker_host} slots={assigned_slots[worker_host]}\n"
+            write_to_location(DeepspeedDistributor.HOSTFILE, line)
+        return worker_hosts
+
+    def setup_env(self):
+        try:
+            subprocess.run("deepspeed --version".split())
+            subprocess.run("ninja --version".split())
+            with open(f"/{DeepspeedDistributor.HOME}/.deepspeed_env", "w") as 
f:
+                # if this is open; don't add that to path if they're not 
running on databricks
+                # TODO: figure out what paths to add to this, because if this 
is OSS we don't want to constantly add a databricks filepath
+                f.write(
+                    
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
+                )
+        except:
+            raise ImportError("Install deepspeed and ninja on the cluster 
using PyPi")
+
+    def _create_deepspeed_command(
+        self, input_params: Dict[str, Any], path_to_train_file: str, *args: Any
+    ):
+        local_mode = input_params["local_mode"]
+        num_processes = input_params["num_processes"]
+        deepspeed_config = input_params["deepspeed_config"]
+        if isinstance(deepspeed_config, dict):
+            with tempfile.NamedTemporaryFile(mode="w+", delete=False, 
suffix=".json") as f:
+                json.dump(deepspeed_config, f)
+                deepspeed_config_path = f.name
+                self.temp_deepspeed_fname = f.name
+        else:
+            deepspeed_config_path = deepspeed_config
+        if local_mode:
+            deepspeed_args = [
+                "--num_gpus",
+                str(num_processes),
+            ]  # no need for num nodes, the host file, or any port stuff (no 
communiation)
+        else:
+            deepspeed_args = [
+                "--num_gpus",
+                str(input_params["num_processes"]),
+                "--num_nodes",
+                str(len(self.worker_hosts)),
+                "--hostfile",
+                str(DeepspeedDistributor.HOSTFILE),
+                "--master_addr",
+                str(self.worker_hosts[0]),
+                "--master_port=9902",
+            ]
+        return [
+            "deepspeed",
+            *deepspeed_args,
+            path_to_train_file,
+            *args,
+            "--deepspeed",
+            "--deepspeed_config",
+            deepspeed_config_path,
+        ]
+
+    def _run_training_on_pytorch_file(
+        self, input_params: Dict[str, Any], train_path: str, *args: Any, 
**kwargs: Any
+    ) -> None:
+        if kwargs:
+            raise ValueError("Running pytorch file does not support key-word 
type arguments.")
+        training_command = self._create_deepspeed_command(input_params, 
train_path, *args)
+        TorchDistributor._execute_command(
+            training_command
+        )  # should we include some form of logging here

Review Comment:
   Let's not reference the TorchDistributor in this file. Could we move 
_execute_command to the Distributor?



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that 
deepspeed is able to use ssh to coordinate among the nodes for the distributed 
training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None

Review Comment:
   Let's make this name more useful, e.g. deepspeed_config_filepath



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that 
deepspeed is able to use ssh to coordinate among the nodes for the distributed 
training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the 
GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # 
pyspark doesn't support this out of the box for some reason, so sadge
+        proc_res = subprocess.run(command, capture_output=True, text=True, 
shell=True)
+        if proc_res.returncode:
+            raise RuntimeError(
+                f"something went wrong when running the command {command}. Is 
nvidia-smi installed?"
+            )
+        num_gpus_on_worker = proc_res.stdout
+        return int(num_gpus_on_worker)
+
+    def _assign_procs_to_worker(self, gpu_node_map: Dict[str, int]) -> 
Dict[str, int]:
+        procs_left = self.num_processes
+        workers_left_to_serve = len(gpu_node_map)
+        average_procs_per_node = procs_left // workers_left_to_serve
+        gpu_mapped_to_node = {}
+        # sorting allows us to just do a single pass, as filling the smallest 
capacity nodes first will allow for a single pass
+        sorted_buckets = sorted(gpu_node_map.items(), key=lambda x: x[1])
+        for worker, capacity in sorted_buckets:
+            average_procs_per_node = procs_left // workers_left_to_serve
+            gpu_mapped_to_node[worker] = min(average_procs_per_node, capacity)
+            procs_left -= gpu_mapped_to_node[worker]
+            workers_left_to_serve -= 1
+        if procs_left != 0:
+            self.logger.warning(
+                msg=f"There are not enough GPUS to fully assign processes to 
nodes; there are {procs_left} processes left over"
+            )
+        return gpu_mapped_to_node
+
+    def _setup_hostfile_info(self):
+        worker_hosts = [
+            executor.host()
+            for executor in 
self.spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()
+        ]  # we should check if this returns the driver or not
+        worker_count = len(worker_hosts)  # find out if this number includes 
the driver or not
+        rdd = spark.sparkContext.parallelize(range(worker_count), 
numSlices=worker_count)
+
+        # what do I do if the use_gpu flag is false?
+        slots_on_workers = {}
+        if self.use_gpu:
+            for worker_host in worker_hosts:
+                slots_on_workers[worker_host] = 
self._get_gpus_on_node(worker_host)
+        else:
+            raise RuntimeError("Deepspeed doesn't work with non-GPU clusters 
at this time")
+
+        assigned_slots = self._assign_procs_to_worker(slots_on_workers)
+        self.logger.info(f"Writing to {DeepspeedDistributor.HOSTFILE}")
+        for worker_host in worker_hosts:
+            line = f"{worker_host} slots={assigned_slots[worker_host]}\n"
+            write_to_location(DeepspeedDistributor.HOSTFILE, line)
+        return worker_hosts
+
+    def setup_env(self):
+        try:
+            subprocess.run("deepspeed --version".split())
+            subprocess.run("ninja --version".split())
+            with open(f"/{DeepspeedDistributor.HOME}/.deepspeed_env", "w") as 
f:
+                # if this is open; don't add that to path if they're not 
running on databricks
+                # TODO: figure out what paths to add to this, because if this 
is OSS we don't want to constantly add a databricks filepath
+                f.write(
+                    
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
+                )
+        except:
+            raise ImportError("Install deepspeed and ninja on the cluster 
using PyPi")

Review Comment:
   Can this be a private function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to