rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068755416


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: 
Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in 
context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, 
my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""

Review Comment:
   I think it should be added because in the case the user runs training with 
`TorchDistributor(use_gpu=False, **kwargs).run(train_fn)` but accidentally has 
some PyTorch Lightning code like `pl.Trainer(accelerator="gpu")` in their 
`train_fn`, an error should be raised saying no cuda devices available even 
though you specified a gpu accelerator.
   
   We already have a check in `get_num_tasks` that checks when `use_gpu=True` 
but no GPUs are available, and I think this code addresses the case when 
`use_gpu=False` but the internal code has usage of GPUs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to