Copilot commented on code in PR #53501:
URL: https://github.com/apache/spark/pull/53501#discussion_r2625966163
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -212,8 +212,14 @@ def _get_num_tasks(self) -> int:
task_gpu_amount = int(_get_conf(self.spark, key, "0"))
if task_gpu_amount < 1:
raise RuntimeError(f"'{key}' was unset, so gpu usage is
unavailable.")
- # TODO(SPARK-41916): Address situation when
spark.task.resource.gpu.amount > 1
- return math.ceil(self.num_processes / task_gpu_amount)
+
+ if task_gpu_amount > 1:
+ if not (self.num_processes % task_gpu_amount == 0):
+ raise RuntimeError(
+ f"TorchDistributor 'num_processes' value must be a
multiple of "
+ "'spark.task.resource.gpu.amount' value"
+ )
Review Comment:
The new validation logic requires num_processes to be an exact multiple of
task_gpu_amount, but this is a breaking change from the previous behavior which
used ceiling division (math.ceil). The existing test case in
test_get_num_tasks_distributed with inputs (3, 8, 3) would now fail because 8 %
3 != 0. This change needs corresponding test updates to validate the new
behavior and should not use test cases where num_processes is not a multiple of
task_gpu_amount.
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -212,8 +212,14 @@ def _get_num_tasks(self) -> int:
task_gpu_amount = int(_get_conf(self.spark, key, "0"))
if task_gpu_amount < 1:
raise RuntimeError(f"'{key}' was unset, so gpu usage is
unavailable.")
- # TODO(SPARK-41916): Address situation when
spark.task.resource.gpu.amount > 1
- return math.ceil(self.num_processes / task_gpu_amount)
+
+ if task_gpu_amount > 1:
+ if not (self.num_processes % task_gpu_amount == 0):
+ raise RuntimeError(
+ f"TorchDistributor 'num_processes' value must be a
multiple of "
+ "'spark.task.resource.gpu.amount' value"
+ )
Review Comment:
The error message should specify the actual values that caused the
validation failure to help users understand what needs to be corrected.
Consider including the actual num_processes value and task_gpu_amount value in
the error message. For example: "TorchDistributor 'num_processes'
({num_processes}) must be a multiple of 'spark.task.resource.gpu.amount'
({task_gpu_amount})".
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -421,14 +427,19 @@ def _get_torchrun_args(local_mode: bool, num_processes:
int) -> Tuple[List[Any],
master_addr = os.environ["MASTER_ADDR"]
master_port = os.environ["MASTER_PORT"]
+
+ if cuda_visible_devices := os.environ.get("CUDA_VISIBLE_DEVICES"):
+ processes_per_node = len(cuda_visible_devices.split(","))
+ else:
+ processes_per_node = 1
node_rank = os.environ["RANK"]
+
torchrun_args = [
- f"--nnodes={num_processes}",
+ f"--nnodes={num_processes // processes_per_node}",
Review Comment:
Using integer division without validation could lead to incorrect nnodes
calculation if num_processes is not evenly divisible by processes_per_node.
This could happen if CUDA_VISIBLE_DEVICES is set to a value that doesn't evenly
divide num_processes. The calculation should validate that num_processes %
processes_per_node == 0 and raise an error if not, similar to the validation
added in _get_num_tasks.
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -421,14 +427,19 @@ def _get_torchrun_args(local_mode: bool, num_processes:
int) -> Tuple[List[Any],
master_addr = os.environ["MASTER_ADDR"]
master_port = os.environ["MASTER_PORT"]
+
+ if cuda_visible_devices := os.environ.get("CUDA_VISIBLE_DEVICES"):
+ processes_per_node = len(cuda_visible_devices.split(","))
+ else:
+ processes_per_node = 1
Review Comment:
The new logic for determining processes_per_node from CUDA_VISIBLE_DEVICES
is not covered by tests. The existing test_get_torchrun_args_distributed in
DeepspeedTorchDistributor tests expects processes_per_node to always be 1,
which would fail if CUDA_VISIBLE_DEVICES is set. Add test coverage for cases
where CUDA_VISIBLE_DEVICES contains multiple GPU IDs to validate the new
behavior.
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -421,14 +427,19 @@ def _get_torchrun_args(local_mode: bool, num_processes:
int) -> Tuple[List[Any],
master_addr = os.environ["MASTER_ADDR"]
master_port = os.environ["MASTER_PORT"]
+
+ if cuda_visible_devices := os.environ.get("CUDA_VISIBLE_DEVICES"):
+ processes_per_node = len(cuda_visible_devices.split(","))
Review Comment:
The CUDA_VISIBLE_DEVICES parsing doesn't handle edge cases properly. If the
environment variable contains whitespace or empty strings between commas (e.g.,
"0, ,2" or "0,,2"), the split will create empty string elements that would
still be counted, leading to an incorrect processes_per_node count. Consider
using a more robust parsing approach that filters out empty strings after
splitting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]