WeichenXu123 commented on code in PR #39369:
URL: https://github.com/apache/spark/pull/39369#discussion_r1080791404


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -574,6 +582,68 @@ def _run_training_on_pytorch_file(
             training_command, log_streaming_client=log_streaming_client
         )
 
+    @staticmethod
+    def _run_training_on_pytorch_function(
+        input_params: dict[str, Any], train_fn: Callable, *args: Any
+    ) -> Any:
+        save_dir = TorchDistributor._create_save_dir()
+        pickle_file_path = TorchDistributor._save_pickled_function(save_dir, 
train_fn, *args)
+        output_file_path = os.path.join(save_dir, 
TorchDistributor.PICKLED_OUTPUT_FILE)
+        train_file_path = TorchDistributor._create_torchrun_train_file(
+            save_dir, pickle_file_path, output_file_path
+        )
+        args = []
+
+        TorchDistributor._run_training_on_pytorch_file(input_params, 
train_file_path, *args)
+
+        output = TorchDistributor._get_pickled_output(output_file_path)
+        TorchDistributor._cleanup_files(save_dir)
+        return output
+
+    @staticmethod
+    def _create_save_dir() -> str:
+        # TODO: need to do this in a safe way to avoid issues during 
concurrent runs
+        return tempfile.mkdtemp()
+
+    @staticmethod
+    def _cleanup_files(save_dir: str) -> None:

Review Comment:
   The method is not called when training routine raises error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to