rithwik-db commented on code in PR #39267: URL: https://github.com/apache/spark/pull/39267#discussion_r1069064636
########## python/pyspark/ml/torch/distributor.py: ########## @@ -428,6 +432,84 @@ def _run_local_training( return output + def _get_spark_task_program( + self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any + ) -> Callable: + num_processes = self.num_processes + num_tasks = self.num_tasks + use_gpu = self.use_gpu + input_params = self.input_params + + # Spark task program + def wrapped_train_fn(_): # type: ignore[no-untyped-def] + import os + from pyspark import BarrierTaskContext + + CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES" + + # The idea of setting the random port to 0 doesn't seem to work? + def get_free_port(address: str) -> int: + import socket + import random + + while True: + port = random.randint(32768, 61000) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if not (sock.connect_ex((address, port)) == 0): + return port + + def set_torch_config(context: "BarrierTaskContext") -> None: + addrs = [e.address.split(":")[0] for e in context.getTaskInfos()] + + os.environ["MASTER_ADDR"] = str(addrs[0]) + os.environ["MASTER_PORT"] = str(get_free_port(addrs[0])) + os.environ["WORLD_SIZE"] = str(num_processes) + os.environ["NODE_RANK"] = str(context.partitionId()) + os.environ["RANK"] = str(context.partitionId()) + + def set_gpus(context: "BarrierTaskContext") -> None: + gpus_owned = get_gpus_owned(context) + + my_num_gpus = (num_processes // num_tasks) + ( + context.partitionId() < (num_processes % num_tasks) + ) + gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)] + os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses) + + context = BarrierTaskContext.get() + + if use_gpu: + set_gpus(context) + else: + os.environ[CUDA_VISIBLE_DEVICES] = "" Review Comment: PyTorch Lightning will raise a `MisconfigurationException: No supported gpu backend found!`. This is what we expect to see if the user sets `use_gpu=False` and calls `pl.Trainer(accelerator="gpu")` My understanding is that if a user runs this code on a local cluster with GPUs on each node without `os.environ[CUDA_VISIBLE_DEVICES] = ""`, then the task will be assigning the task a GPU even when `use_gpu=True`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org