mathewjacob1002 commented on code in PR #41973:
URL: https://github.com/apache/spark/pull/41973#discussion_r1264193698
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -514,11 +514,26 @@ def _execute_command(
f"Command {cmd} failed with return code {task.returncode}. "
f"The {last_n_msg} included below: {task_output}"
)
+ @staticmethod
+ def _get_output_from_framework_wrapper(framework_wrapper:
Optional[Callable], input_params: Dict, train_object: Union[Callable, str],
run_pytorch_file_fn: Optional[Callable], *args, **kwargs) -> Optional[Any]:
+ if not framework_wrapper:
+ raise RuntimeError("In the _get_output_from_framework_wrapper
function, found a framework wrapper that is none. I wonder why this is...")
+ # The object to train is a file path, so framework_wrapper is some
run_training_on_pytorch_file function.
+ if type(train_object) is str:
+ return framework_wrapper(input_params, train_object, *args,
**kwargs)
+ else:
+ # We are doing training with a function, will call
run_training_on_pytorch_function
+ if not run_pytorch_file_fn:
+ run_pytorch_file_fn =
TorchDistributor._run_training_on_pytorch_file
+ return framework_wrapper(input_params, train_object,
run_pytorch_file_fn, *args, **kwargs)
+
+
def _run_local_training(
self,
framework_wrapper_fn: Callable,
train_object: Union[Callable, str],
+ run_pytorch_file_fn: Optional[Callable],
Review Comment:
do you mind if I ask why?
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -514,11 +515,26 @@ def _execute_command(
f"Command {cmd} failed with return code {task.returncode}. "
f"The {last_n_msg} included below: {task_output}"
)
+ @staticmethod
+ def _get_output_from_framework_wrapper(framework_wrapper:
Optional[Callable], input_params: Dict, train_object: Union[Callable, str],
run_pytorch_file_fn: Optional[Callable], *args, **kwargs) -> Optional[Any]:
+ if not framework_wrapper:
+ raise RuntimeError("In the _get_output_from_framework_wrapper
function, found a framework wrapper that is none. I wonder why this is...")
+ # The object to train is a file path, so framework_wrapper is some
run_training_on_pytorch_file function.
+ if type(train_object) is str:
+ return framework_wrapper(input_params, train_object, *args,
**kwargs)
+ else:
+ # We are doing training with a function, will call
run_training_on_pytorch_function
+ if not run_pytorch_file_fn:
Review Comment:
This won't work because of the *args and **kwargs after. Python kind of
freaks out and can't do default value in my experience.
##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -910,17 +932,20 @@ def run(self, train_object: Union[Callable, str], *args:
Any, **kwargs: Any) ->
train_object is a Callable with an expected output. Returns None
if train_object is
a file.
"""
+ return self._run(train_object,
TorchDistributor._run_training_on_pytorch_file, *args, **kwargs)
Review Comment:
Imma resolve this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]