maddiedawson commented on code in PR #41770:
URL: https://github.com/apache/spark/pull/41770#discussion_r1256498045
##########
python/pyspark/ml/torch/tests/test_distributor.py:
##########
@@ -29,14 +29,16 @@
import unittest
from unittest.mock import patch
+from pyspark.ml.torch.distributor import DeepspeedTorchDistributor
Review Comment:
Redundant import
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -1003,3 +1006,142 @@ def _get_spark_partition_data_loader(
# if num_workers is zero, we cannot set `prefetch_factor` otherwise
# torch will raise error.
return DataLoader(dataset, batch_size, num_workers=num_workers)
+
+
+class DeepspeedTorchDistributor(TorchDistributor):
+
+ def __init__(self, num_gpus: int = 1, nnodes: int = 1, local_mode: bool =
True, use_gpu: bool = True, deepspeed_config = None):
+ """
+ This class is used to run deepspeed training workloads with spark
clusters. The user has the option to
+ specify the number of gpus per node and the number of nodes (the
same as if running from terminal),
+ as well as specify a deepspeed configuration file.
+
+ Parameters
+ ----------
+ num_gpus: int
+ the number of GPUs to use per node (analagous to num_gpus in
deepspeed command)
+
+ nnodes: int
+ the number of nodes that should be used for the run
+
+ local_mode: bool
+ whether or not to run the training in a distributed fashion or
just locally
+
+ use_gpu: bool
+ boolean flag to determine whether to utilize gpus
+
+ deepspeed_config: Union[Dict[str,Any], str] or None:
+ the configuration file to be used for launching the deepspeed
application. If it is a dictionary mapping parameters to values, then we will
create the file.
+ If None, deepspeed will fall back to default parameters.
+ """
+ num_processes = num_gpus * nnodes
+ super().__init__(num_processes, local_mode, use_gpu)
+ self.deepspeed_config = deepspeed_config
+ self.ssl_conf = "deepspeed.spark.distributor.ignoreSsl"
+ self._validate_input_params()
+ self.input_params = self._create_input_params()
+ self.cleanup_deepspeed_conf = False
+
+ @staticmethod
+ def _get_deepspeed_config_path(deepspeed_config):
+ if isinstance(deepspeed_config, dict):
+ with tempfile.NamedTemporaryFile(mode='w+', delete=False,
suffix='.json') as fil:
+ json.dump(deepspeed_config, fil)
+ return fil.name
+ deepspeed_config_path = deepspeed_config
+ # Empty value means the deepspeed will fall back to default settings.
+ if deepspeed_config == None:
+ deepspeed_config_path = ""
+
+ return deepspeed_config_path
+
+
+ @staticmethod
+ def _get_torchrun_args(local_mode: bool, num_processes: int) ->
Tuple[List[Any], int]:
+ """
+ Given the mode and the number of processes, create the arguments to be
given to deepspeed
+
+ Parameters
+ ---------
+ local_mode: bool
+ Whether or not we are running training locally or in a distributed
fashion
+
+ num_processes: int
+ The number of processes that we are going to use (number of gpus
per node * number of nodes)
+
+ Returns
+ ------
+ Tuple[List[Any], int]
+ A tuple containing a list of arguments to pass as pytorch args to
deepspeed, as well as the number of processes per node
+ """
+ if local_mode:
+ torchrun_args = ["--standalone", "--nnodes=1"]
+ processes_per_node = num_processes
+ return torchrun_args, processes_per_node
+
+ master_addr, master_port = (
+ os.environ["MASTER_ADDR"],
+ os.environ["MASTER_PORT"],
+ )
Review Comment:
No need to do tuple unpacking and assignment here. Just split into two lines
with one assignment each
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -1003,3 +1006,142 @@ def _get_spark_partition_data_loader(
# if num_workers is zero, we cannot set `prefetch_factor` otherwise
# torch will raise error.
return DataLoader(dataset, batch_size, num_workers=num_workers)
+
+
+class DeepspeedTorchDistributor(TorchDistributor):
+
+ def __init__(self, num_gpus: int = 1, nnodes: int = 1, local_mode: bool =
True, use_gpu: bool = True, deepspeed_config = None):
+ """
+ This class is used to run deepspeed training workloads with spark
clusters. The user has the option to
+ specify the number of gpus per node and the number of nodes (the
same as if running from terminal),
+ as well as specify a deepspeed configuration file.
+
+ Parameters
+ ----------
+ num_gpus: int
+ the number of GPUs to use per node (analagous to num_gpus in
deepspeed command)
+
+ nnodes: int
+ the number of nodes that should be used for the run
+
+ local_mode: bool
+ whether or not to run the training in a distributed fashion or
just locally
+
+ use_gpu: bool
+ boolean flag to determine whether to utilize gpus
+
+ deepspeed_config: Union[Dict[str,Any], str] or None:
+ the configuration file to be used for launching the deepspeed
application. If it is a dictionary mapping parameters to values, then we will
create the file.
+ If None, deepspeed will fall back to default parameters.
+ """
+ num_processes = num_gpus * nnodes
+ super().__init__(num_processes, local_mode, use_gpu)
+ self.deepspeed_config = deepspeed_config
+ self.ssl_conf = "deepspeed.spark.distributor.ignoreSsl"
+ self._validate_input_params()
+ self.input_params = self._create_input_params()
+ self.cleanup_deepspeed_conf = False
+
+ @staticmethod
+ def _get_deepspeed_config_path(deepspeed_config):
+ if isinstance(deepspeed_config, dict):
+ with tempfile.NamedTemporaryFile(mode='w+', delete=False,
suffix='.json') as fil:
+ json.dump(deepspeed_config, fil)
+ return fil.name
+ deepspeed_config_path = deepspeed_config
+ # Empty value means the deepspeed will fall back to default settings.
+ if deepspeed_config == None:
+ deepspeed_config_path = ""
+
+ return deepspeed_config_path
Review Comment:
Rewrite this as
\# Empty value means the deepspeed will fall back to default settings.
if deepspeed_config == None:
return ""
return deepspeed_config
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -1003,3 +1006,142 @@ def _get_spark_partition_data_loader(
# if num_workers is zero, we cannot set `prefetch_factor` otherwise
# torch will raise error.
return DataLoader(dataset, batch_size, num_workers=num_workers)
+
+
+class DeepspeedTorchDistributor(TorchDistributor):
+
+ def __init__(self, num_gpus: int = 1, nnodes: int = 1, local_mode: bool =
True, use_gpu: bool = True, deepspeed_config = None):
+ """
+ This class is used to run deepspeed training workloads with spark
clusters. The user has the option to
+ specify the number of gpus per node and the number of nodes (the
same as if running from terminal),
+ as well as specify a deepspeed configuration file.
+
+ Parameters
+ ----------
+ num_gpus: int
+ the number of GPUs to use per node (analagous to num_gpus in
deepspeed command)
+
+ nnodes: int
+ the number of nodes that should be used for the run
+
+ local_mode: bool
+ whether or not to run the training in a distributed fashion or
just locally
+
+ use_gpu: bool
+ boolean flag to determine whether to utilize gpus
+
+ deepspeed_config: Union[Dict[str,Any], str] or None:
+ the configuration file to be used for launching the deepspeed
application. If it is a dictionary mapping parameters to values, then we will
create the file.
Review Comment:
Split this into two lines bc it is long
##########
python/pyspark/ml/torch/tests/test_distributor.py:
##########
@@ -543,6 +545,126 @@ def test_check_parent_alive(self,
mock_clean_and_terminate: Callable) -> None:
class TorchWrapperUnitTests(TorchWrapperUnitTestsMixin, unittest.TestCase):
pass
+class DeepspeedTorchDistributorUnitTests(unittest.TestCase):
+
+ def _get_env_var(self, var_name: str, default_value: Any) -> Any:
+ value = os.getenv(var_name)
+ if value:
+ return value
+ else:
Review Comment:
Can remove the else statement
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -1003,3 +1006,142 @@ def _get_spark_partition_data_loader(
# if num_workers is zero, we cannot set `prefetch_factor` otherwise
# torch will raise error.
return DataLoader(dataset, batch_size, num_workers=num_workers)
+
+
+class DeepspeedTorchDistributor(TorchDistributor):
+
+ def __init__(self, num_gpus: int = 1, nnodes: int = 1, local_mode: bool =
True, use_gpu: bool = True, deepspeed_config = None):
+ """
+ This class is used to run deepspeed training workloads with spark
clusters. The user has the option to
+ specify the number of gpus per node and the number of nodes (the
same as if running from terminal),
+ as well as specify a deepspeed configuration file.
+
+ Parameters
+ ----------
+ num_gpus: int
+ the number of GPUs to use per node (analagous to num_gpus in
deepspeed command)
+
+ nnodes: int
+ the number of nodes that should be used for the run
+
+ local_mode: bool
+ whether or not to run the training in a distributed fashion or
just locally
+
+ use_gpu: bool
+ boolean flag to determine whether to utilize gpus
+
+ deepspeed_config: Union[Dict[str,Any], str] or None:
+ the configuration file to be used for launching the deepspeed
application. If it is a dictionary mapping parameters to values, then we will
create the file.
+ If None, deepspeed will fall back to default parameters.
+ """
+ num_processes = num_gpus * nnodes
+ super().__init__(num_processes, local_mode, use_gpu)
+ self.deepspeed_config = deepspeed_config
+ self.ssl_conf = "deepspeed.spark.distributor.ignoreSsl"
+ self._validate_input_params()
+ self.input_params = self._create_input_params()
+ self.cleanup_deepspeed_conf = False
+
+ @staticmethod
+ def _get_deepspeed_config_path(deepspeed_config):
+ if isinstance(deepspeed_config, dict):
+ with tempfile.NamedTemporaryFile(mode='w+', delete=False,
suffix='.json') as fil:
+ json.dump(deepspeed_config, fil)
+ return fil.name
+ deepspeed_config_path = deepspeed_config
+ # Empty value means the deepspeed will fall back to default settings.
+ if deepspeed_config == None:
+ deepspeed_config_path = ""
+
+ return deepspeed_config_path
+
+
+ @staticmethod
+ def _get_torchrun_args(local_mode: bool, num_processes: int) ->
Tuple[List[Any], int]:
+ """
+ Given the mode and the number of processes, create the arguments to be
given to deepspeed
+
+ Parameters
+ ---------
+ local_mode: bool
+ Whether or not we are running training locally or in a distributed
fashion
+
+ num_processes: int
+ The number of processes that we are going to use (number of gpus
per node * number of nodes)
+
+ Returns
+ ------
+ Tuple[List[Any], int]
+ A tuple containing a list of arguments to pass as pytorch args to
deepspeed, as well as the number of processes per node
+ """
+ if local_mode:
+ torchrun_args = ["--standalone", "--nnodes=1"]
+ processes_per_node = num_processes
+ return torchrun_args, processes_per_node
+
+ master_addr, master_port = (
+ os.environ["MASTER_ADDR"],
+ os.environ["MASTER_PORT"],
+ )
+ node_rank = os.environ["RANK"]
+ torchrun_args = [
+ f"--nnodes={num_processes}",
+ f"--node_rank={node_rank}",
+ f"--rdzv_endpoint={master_addr}:{master_port}",
+ "--rdzv_id=0",
+ ]
+ processes_per_node = 1
+ return torchrun_args, processes_per_node
+
+ @staticmethod
+ def _create_torchrun_command(
+ input_params: Dict[str, Any], train_path: str, *args: Any) ->
List[str]:
+ local_mode = input_params["local_mode"]
+ num_processes = input_params["num_processes"]
+ deepspeed_config = input_params["deepspeed_config"]
+
+ deepspeed_config_path =
DeepspeedTorchDistributor._get_deepspeed_config_path(deepspeed_config)
+
+
+ torchrun_args, processes_per_node =
DeepspeedTorchDistributor._get_torchrun_args(local_mode, num_processes)
+
+ args_string = list(map(str, args))
+
+ command_to_run = [
+ sys.executable,
+ "-m",
+ "torch.distributed.run",
+ *torchrun_args,
+ f"--nproc_per_node={processes_per_node}",
+ train_path,
+ *args_string,
+ "-deepspeed",
+ "--deepspeed_config",
+ deepspeed_config_path
+ ]
+ return command_to_run
+
+ @staticmethod
+ def _run_training_on_pytorch_file(input_params: Dict[str, Any],
train_path: str, *args: Any, **kwargs : Any) -> None :
+ if kwargs:
+ raise ValueError("DeepspeedTorchDistributor with pytorch file
doesn't support key-word type arguments")
+
+ log_streaming_client = input_params.get("log_streaming_client", None)
+ training_command =
DeepspeedTorchDistributor._create_torchrun_command(input_params, train_path,
*args)
+ DeepspeedTorchDistributor._execute_command(training_command,
log_streaming_client=log_streaming_client)
+
+ def run(self, train_object: Union[Callable, str], *args : Any, **kwargs:
Any) -> Optional[Any]:
+ # If the "train_object" is a string, then we assume it's a filepath.
Otherwise, we assume it's a function.
+ if isinstance(train_object, str):
+ framework_wrapper_fn =
DeepspeedTorchDistributor._run_training_on_pytorch_file
+ else:
+ raise RuntimeError("Work in progress; not supported atm")
+ framework_wrapper_fn =
TorchDistributor._run_training_on_pytorch_file
+ if self.local_mode:
+ output = self._run_local_training(framework_wrapper_fn,
train_object, *args, **kwargs)
Review Comment:
return here and remove else below
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -1003,3 +1006,142 @@ def _get_spark_partition_data_loader(
# if num_workers is zero, we cannot set `prefetch_factor` otherwise
# torch will raise error.
return DataLoader(dataset, batch_size, num_workers=num_workers)
+
+
+class DeepspeedTorchDistributor(TorchDistributor):
+
+ def __init__(self, num_gpus: int = 1, nnodes: int = 1, local_mode: bool =
True, use_gpu: bool = True, deepspeed_config = None):
+ """
+ This class is used to run deepspeed training workloads with spark
clusters. The user has the option to
+ specify the number of gpus per node and the number of nodes (the
same as if running from terminal),
+ as well as specify a deepspeed configuration file.
+
+ Parameters
+ ----------
+ num_gpus: int
+ the number of GPUs to use per node (analagous to num_gpus in
deepspeed command)
Review Comment:
Clean these up a bit please with capitalization and periods at the end.
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -1003,3 +1006,142 @@ def _get_spark_partition_data_loader(
# if num_workers is zero, we cannot set `prefetch_factor` otherwise
# torch will raise error.
return DataLoader(dataset, batch_size, num_workers=num_workers)
+
+
+class DeepspeedTorchDistributor(TorchDistributor):
+
+ def __init__(self, num_gpus: int = 1, nnodes: int = 1, local_mode: bool =
True, use_gpu: bool = True, deepspeed_config = None):
+ """
+ This class is used to run deepspeed training workloads with spark
clusters. The user has the option to
+ specify the number of gpus per node and the number of nodes (the
same as if running from terminal),
+ as well as specify a deepspeed configuration file.
+
+ Parameters
+ ----------
+ num_gpus: int
+ the number of GPUs to use per node (analagous to num_gpus in
deepspeed command)
+
+ nnodes: int
+ the number of nodes that should be used for the run
+
+ local_mode: bool
+ whether or not to run the training in a distributed fashion or
just locally
+
+ use_gpu: bool
+ boolean flag to determine whether to utilize gpus
+
+ deepspeed_config: Union[Dict[str,Any], str] or None:
+ the configuration file to be used for launching the deepspeed
application. If it is a dictionary mapping parameters to values, then we will
create the file.
+ If None, deepspeed will fall back to default parameters.
+ """
+ num_processes = num_gpus * nnodes
+ super().__init__(num_processes, local_mode, use_gpu)
+ self.deepspeed_config = deepspeed_config
+ self.ssl_conf = "deepspeed.spark.distributor.ignoreSsl"
+ self._validate_input_params()
+ self.input_params = self._create_input_params()
+ self.cleanup_deepspeed_conf = False
+
+ @staticmethod
+ def _get_deepspeed_config_path(deepspeed_config):
+ if isinstance(deepspeed_config, dict):
+ with tempfile.NamedTemporaryFile(mode='w+', delete=False,
suffix='.json') as fil:
+ json.dump(deepspeed_config, fil)
+ return fil.name
+ deepspeed_config_path = deepspeed_config
+ # Empty value means the deepspeed will fall back to default settings.
+ if deepspeed_config == None:
+ deepspeed_config_path = ""
+
+ return deepspeed_config_path
+
+
+ @staticmethod
+ def _get_torchrun_args(local_mode: bool, num_processes: int) ->
Tuple[List[Any], int]:
+ """
+ Given the mode and the number of processes, create the arguments to be
given to deepspeed
+
+ Parameters
+ ---------
+ local_mode: bool
+ Whether or not we are running training locally or in a distributed
fashion
+
+ num_processes: int
+ The number of processes that we are going to use (number of gpus
per node * number of nodes)
+
+ Returns
+ ------
+ Tuple[List[Any], int]
+ A tuple containing a list of arguments to pass as pytorch args to
deepspeed, as well as the number of processes per node
+ """
+ if local_mode:
+ torchrun_args = ["--standalone", "--nnodes=1"]
+ processes_per_node = num_processes
+ return torchrun_args, processes_per_node
+
+ master_addr, master_port = (
+ os.environ["MASTER_ADDR"],
+ os.environ["MASTER_PORT"],
+ )
+ node_rank = os.environ["RANK"]
+ torchrun_args = [
+ f"--nnodes={num_processes}",
+ f"--node_rank={node_rank}",
+ f"--rdzv_endpoint={master_addr}:{master_port}",
+ "--rdzv_id=0",
+ ]
+ processes_per_node = 1
+ return torchrun_args, processes_per_node
+
+ @staticmethod
+ def _create_torchrun_command(
+ input_params: Dict[str, Any], train_path: str, *args: Any) ->
List[str]:
+ local_mode = input_params["local_mode"]
+ num_processes = input_params["num_processes"]
+ deepspeed_config = input_params["deepspeed_config"]
+
+ deepspeed_config_path =
DeepspeedTorchDistributor._get_deepspeed_config_path(deepspeed_config)
+
+
+ torchrun_args, processes_per_node =
DeepspeedTorchDistributor._get_torchrun_args(local_mode, num_processes)
+
+ args_string = list(map(str, args))
+
+ command_to_run = [
Review Comment:
Remove some of the newlines here
##########
python/pyspark/ml/torch/tests/test_distributor.py:
##########
@@ -543,6 +545,126 @@ def test_check_parent_alive(self,
mock_clean_and_terminate: Callable) -> None:
class TorchWrapperUnitTests(TorchWrapperUnitTestsMixin, unittest.TestCase):
pass
+class DeepspeedTorchDistributorUnitTests(unittest.TestCase):
+
+ def _get_env_var(self, var_name: str, default_value: Any) -> Any:
+ value = os.getenv(var_name)
+ if value:
+ return value
+ else:
+ os.environ[var_name] = default_value
+ value = default_value
+ return value
+
+ def _get_env_variables_distributed(self):
+ MASTER_ADDR = self._get_env_var("MASTER_ADDR", "127.0.0.1")
+ MASTER_PORT = self._get_env_var("MASTER_PORT", 2000)
+ RANK = self._get_env_var("RANK", 0)
+ return MASTER_ADDR, MASTER_PORT, RANK
+
+
+
+ def test_get_torchrun_args(self):
+ number_of_processes = 5
+ EXPECTED_TORCHRUN_ARGS_LOCAL= [
+ "--standalone", "--nnodes=1"
+ ]
+ EXPECTED_PROCESSES_PER_NODE_LOCAL = number_of_processes
+
+
+ get_local_mode_torchrun_args, process_per_node=
DeepspeedTorchDistributor._get_torchrun_args(True, number_of_processes)
+ assert(get_local_mode_torchrun_args == EXPECTED_TORCHRUN_ARGS_LOCAL)
+ assert(EXPECTED_PROCESSES_PER_NODE_LOCAL == process_per_node)
+ MASTER_ADDR, MASTER_PORT, RANK = self._get_env_variables_distributed()
+ EXPECTED_TORCHRUN_ARGS_DISTRIBUTED = [
+ f"--nnodes={number_of_processes}",
+ f" --node_rank={RANK}",
+ f" --rdzv_endpoint={MASTER_ADDR}:{MASTER_PORT}",
+ "--rdzv_id=0"
+ ]
+ torchrun_args_distributed, process_per_node =
DeepspeedTorchDistributor._get_torchrun_args(False, number_of_processes)
+ assert(torchrun_args_distributed == EXPECTED_TORCHRUN_ARGS_DISTRIBUTED)
+ assert(process_per_node == 1)
Review Comment:
Can we split this into two tests (local vs distributed)?
##########
python/pyspark/ml/torch/tests/test_distributor.py:
##########
@@ -543,6 +545,126 @@ def test_check_parent_alive(self,
mock_clean_and_terminate: Callable) -> None:
class TorchWrapperUnitTests(TorchWrapperUnitTestsMixin, unittest.TestCase):
pass
+class DeepspeedTorchDistributorUnitTests(unittest.TestCase):
+
+ def _get_env_var(self, var_name: str, default_value: Any) -> Any:
+ value = os.getenv(var_name)
+ if value:
+ return value
+ else:
+ os.environ[var_name] = default_value
+ value = default_value
+ return value
+
+ def _get_env_variables_distributed(self):
+ MASTER_ADDR = self._get_env_var("MASTER_ADDR", "127.0.0.1")
+ MASTER_PORT = self._get_env_var("MASTER_PORT", 2000)
+ RANK = self._get_env_var("RANK", 0)
+ return MASTER_ADDR, MASTER_PORT, RANK
+
+
+
+ def test_get_torchrun_args(self):
+ number_of_processes = 5
+ EXPECTED_TORCHRUN_ARGS_LOCAL= [
+ "--standalone", "--nnodes=1"
+ ]
+ EXPECTED_PROCESSES_PER_NODE_LOCAL = number_of_processes
+
+
+ get_local_mode_torchrun_args, process_per_node=
DeepspeedTorchDistributor._get_torchrun_args(True, number_of_processes)
+ assert(get_local_mode_torchrun_args == EXPECTED_TORCHRUN_ARGS_LOCAL)
+ assert(EXPECTED_PROCESSES_PER_NODE_LOCAL == process_per_node)
+ MASTER_ADDR, MASTER_PORT, RANK = self._get_env_variables_distributed()
+ EXPECTED_TORCHRUN_ARGS_DISTRIBUTED = [
+ f"--nnodes={number_of_processes}",
+ f" --node_rank={RANK}",
+ f" --rdzv_endpoint={MASTER_ADDR}:{MASTER_PORT}",
+ "--rdzv_id=0"
+ ]
+ torchrun_args_distributed, process_per_node =
DeepspeedTorchDistributor._get_torchrun_args(False, number_of_processes)
+ assert(torchrun_args_distributed == EXPECTED_TORCHRUN_ARGS_DISTRIBUTED)
+ assert(process_per_node == 1)
+
+
+ def test_create_torchrun_command(self):
Review Comment:
Same here, let's split this into local vs distributed tests. If there is
shared setup, it can move to a helper function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]