Copilot commented on code in PR #53501:
URL: https://github.com/apache/spark/pull/53501#discussion_r2625966163


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -212,8 +212,14 @@ def _get_num_tasks(self) -> int:
                 task_gpu_amount = int(_get_conf(self.spark, key, "0"))
                 if task_gpu_amount < 1:
                     raise RuntimeError(f"'{key}' was unset, so gpu usage is 
unavailable.")
-                # TODO(SPARK-41916): Address situation when 
spark.task.resource.gpu.amount > 1
-                return math.ceil(self.num_processes / task_gpu_amount)
+
+                if task_gpu_amount > 1:
+                    if not (self.num_processes % task_gpu_amount == 0):
+                        raise RuntimeError(
+                            f"TorchDistributor 'num_processes' value must be a 
multiple of "
+                            "'spark.task.resource.gpu.amount' value"
+                        )

Review Comment:
   The new validation logic requires num_processes to be an exact multiple of 
task_gpu_amount, but this is a breaking change from the previous behavior which 
used ceiling division (math.ceil). The existing test case in 
test_get_num_tasks_distributed with inputs (3, 8, 3) would now fail because 8 % 
3 != 0. This change needs corresponding test updates to validate the new 
behavior and should not use test cases where num_processes is not a multiple of 
task_gpu_amount.



##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -212,8 +212,14 @@ def _get_num_tasks(self) -> int:
                 task_gpu_amount = int(_get_conf(self.spark, key, "0"))
                 if task_gpu_amount < 1:
                     raise RuntimeError(f"'{key}' was unset, so gpu usage is 
unavailable.")
-                # TODO(SPARK-41916): Address situation when 
spark.task.resource.gpu.amount > 1
-                return math.ceil(self.num_processes / task_gpu_amount)
+
+                if task_gpu_amount > 1:
+                    if not (self.num_processes % task_gpu_amount == 0):
+                        raise RuntimeError(
+                            f"TorchDistributor 'num_processes' value must be a 
multiple of "
+                            "'spark.task.resource.gpu.amount' value"
+                        )

Review Comment:
   The error message should specify the actual values that caused the 
validation failure to help users understand what needs to be corrected. 
Consider including the actual num_processes value and task_gpu_amount value in 
the error message. For example: "TorchDistributor 'num_processes' 
({num_processes}) must be a multiple of 'spark.task.resource.gpu.amount' 
({task_gpu_amount})".



##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -421,14 +427,19 @@ def _get_torchrun_args(local_mode: bool, num_processes: 
int) -> Tuple[List[Any],
 
         master_addr = os.environ["MASTER_ADDR"]
         master_port = os.environ["MASTER_PORT"]
+
+        if cuda_visible_devices := os.environ.get("CUDA_VISIBLE_DEVICES"):
+            processes_per_node = len(cuda_visible_devices.split(","))
+        else:
+            processes_per_node = 1
         node_rank = os.environ["RANK"]
+
         torchrun_args = [
-            f"--nnodes={num_processes}",
+            f"--nnodes={num_processes // processes_per_node}",

Review Comment:
   Using integer division without validation could lead to incorrect nnodes 
calculation if num_processes is not evenly divisible by processes_per_node. 
This could happen if CUDA_VISIBLE_DEVICES is set to a value that doesn't evenly 
divide num_processes. The calculation should validate that num_processes % 
processes_per_node == 0 and raise an error if not, similar to the validation 
added in _get_num_tasks.



##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -421,14 +427,19 @@ def _get_torchrun_args(local_mode: bool, num_processes: 
int) -> Tuple[List[Any],
 
         master_addr = os.environ["MASTER_ADDR"]
         master_port = os.environ["MASTER_PORT"]
+
+        if cuda_visible_devices := os.environ.get("CUDA_VISIBLE_DEVICES"):
+            processes_per_node = len(cuda_visible_devices.split(","))
+        else:
+            processes_per_node = 1

Review Comment:
   The new logic for determining processes_per_node from CUDA_VISIBLE_DEVICES 
is not covered by tests. The existing test_get_torchrun_args_distributed in 
DeepspeedTorchDistributor tests expects processes_per_node to always be 1, 
which would fail if CUDA_VISIBLE_DEVICES is set. Add test coverage for cases 
where CUDA_VISIBLE_DEVICES contains multiple GPU IDs to validate the new 
behavior.



##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -421,14 +427,19 @@ def _get_torchrun_args(local_mode: bool, num_processes: 
int) -> Tuple[List[Any],
 
         master_addr = os.environ["MASTER_ADDR"]
         master_port = os.environ["MASTER_PORT"]
+
+        if cuda_visible_devices := os.environ.get("CUDA_VISIBLE_DEVICES"):
+            processes_per_node = len(cuda_visible_devices.split(","))

Review Comment:
   The CUDA_VISIBLE_DEVICES parsing doesn't handle edge cases properly. If the 
environment variable contains whitespace or empty strings between commas (e.g., 
"0, ,2" or "0,,2"), the split will create empty string elements that would 
still be counted, leading to an incorrect processes_per_node count. Consider 
using a more robust parsing approach that filters out empty strings after 
splitting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to