[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-06 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063774097


##
python/pyspark/ml/torch/tests/test_distributor.py:
##
@@ -0,0 +1,193 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import stat
+import tempfile
+import unittest
+
+from pyspark import SparkConf, SparkContext
+from pyspark.ml.torch.distributor import TorchDistributor
+from pyspark.sql import SparkSession
+from pyspark.testing.utils import SPARK_HOME
+
+
+class TorchDistributorBaselineUnitTests(unittest.TestCase):
+def setUp(self) -> None:
+conf = SparkConf()
+self.sc = SparkContext("local[4]", conf=conf)
+self.spark = SparkSession(self.sc)
+
+def tearDown(self) -> None:
+self.spark.stop()
+
+def test_validate_correct_inputs(self) -> None:
+inputs = [
+("pytorch", 1, True, False),
+("pytorch", 100, True, False),
+("pytorch-lightning", 1, False, False),
+("pytorch-lightning", 100, False, False),
+]
+for framework, num_processes, local_mode, use_gpu in inputs:
+with self.subTest():
+TorchDistributor(framework, num_processes, local_mode, use_gpu)
+
+def test_validate_incorrect_inputs(self) -> None:
+inputs = [
+("tensorflow", 1, True, False, ValueError, "framework"),
+("pytroch", 100, True, False, ValueError, "framework"),
+("pytorchlightning", 1, False, False, ValueError, "framework"),
+("pytorch-lightning", 0, False, False, ValueError, "positive"),
+]
+for framework, num_processes, local_mode, use_gpu, error, message in 
inputs:
+with self.subTest():
+with self.assertRaisesRegex(error, message):
+TorchDistributor(framework, num_processes, local_mode, 
use_gpu)
+
+def test_encryption_passes(self) -> None:
+inputs = [
+("spark.ssl.enabled", "false", 
"pytorch.spark.distributor.ignoreSsl", "true"),
+("spark.ssl.enabled", "false", 
"pytorch.spark.distributor.ignoreSsl", "false"),
+("spark.ssl.enabled", "true", 
"pytorch.spark.distributor.ignoreSsl", "true"),
+]
+for ssl_conf_key, ssl_conf_value, pytorch_conf_key, pytorch_conf_value 
in inputs:
+with self.subTest():
+self.spark.sparkContext._conf.set(ssl_conf_key, ssl_conf_value)
+self.spark.sparkContext._conf.set(pytorch_conf_key, 
pytorch_conf_value)
+distributor = TorchDistributor("pytorch", 1, True, False)
+distributor._check_encryption()
+
+def test_encryption_fails(self) -> None:
+# this is the only combination that should fail
+inputs = [("spark.ssl.enabled", "true", 
"pytorch.spark.distributor.ignoreSsl", "false")]
+for ssl_conf_key, ssl_conf_value, pytorch_conf_key, pytorch_conf_value 
in inputs:
+with self.subTest():
+with self.assertRaisesRegex(Exception, "encryption"):
+self.spark.sparkContext._conf.set(ssl_conf_key, 
ssl_conf_value)
+self.spark.sparkContext._conf.set(pytorch_conf_key, 
pytorch_conf_value)
+distributor = TorchDistributor("pytorch", 1, True, False)
+distributor._check_encryption()
+
+def test_get_num_tasks_fails(self) -> None:
+inputs = [1, 5, 4]
+
+# This is when the conf isn't set and we request GPUs
+for num_processes in inputs:
+with self.subTest():
+with self.assertRaisesRegex(RuntimeError, "driver"):
+TorchDistributor("pytorch", num_processes, True, True)
+with self.assertRaisesRegex(RuntimeError, "unset"):
+TorchDistributor("pytorch", num_processes, False, True)
+
+
+class TorchDistributorLocalUnitTests(unittest.TestCase):
+def setUp(self) -> None:
+class_name = self.__class__.__name__
+self.tempFile = tempfile.NamedTemporaryFile(delete=False)
+self.tempFile.write(
+b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": 
[\\"0\\",\\"1\\",\\"2\\"]}'
+)
+   

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063186150


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:

Review Comment:
   Is this [deprecation](https://peps.python.org/pep-0632/) a relevant issue if 
we were to use `distutils`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063163530


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,307 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+RuntimeError
+Thrown when the conf value is not a valid boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise RuntimeError(
+"get_conf_boolean expected a boolean conf "

Review Comment:
   Ok, thanks for the clarification! :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063163530


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,307 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+RuntimeError
+Thrown when the conf value is not a valid boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise RuntimeError(
+"get_conf_boolean expected a boolean conf "

Review Comment:
   Ok will update other errors as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063150272


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,307 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+RuntimeError
+Thrown when the conf value is not a valid boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise RuntimeError(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+"""
+The parent class for TorchDistributor. This class shouldn't be 
instantiated directly.
+"""
+
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+self.spark = SparkSession.getActiveSession()
+if not self.spark:
+raise RuntimeError("An active SparkSession is required for the 
distributor.")
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+
+Raises
+--
+RuntimeError
+Raised when the SparkConf was misconfigured.
+"""
+
+if self.use_gpu:
+if not self.local_mode:
+key = "spark.task.resource.gpu.amount"
+task_gpu_amount = int(self.sc.getConf().get(key, "0"))
+if task_gpu_amount < 1:
+raise RuntimeError(f"'{key}' was unset, so gpu usage is 
unavailable.")
+# TODO(SPARK-41916): Address situation when 
spark.task.resource.gpu.amount > 1
+return math.ceil(self.num_processes / task_gpu_amount)
+else:
+key = "spark.driver.resource.gpu.amount"
+if "gpu" not in self.sc.resources:
+raise RuntimeError("GPUs were unable to be found on the 
driver.")
+num_available_gpus = int(self.sc.getConf().get(key, "0"))
+if num_available_gpus == 0:
+raise RuntimeError("GPU resources were not configured 
properly on the driver.")
+if self.num_processes > num_available_gpus:
+warnings.warn(
+f"'num_processes' cannot be set to a value greater 
than the number of "
+f"available GPUs on the driver, which is 
{num_available_gpus}. "
+f"'num_processes' was reset to be equal to the number 
of available GPUs.",
+RuntimeWarning,
+)
+self.num_processes = 

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063149995


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,307 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+RuntimeError
+Thrown when the conf value is not a valid boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise RuntimeError(
+"get_conf_boolean expected a boolean conf "

Review Comment:
   Do we use `ValueError` for conf errors or `RuntimeError`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063124224


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+"""
+The parent class for TorchDistributor. This class shouldn't be 
instantiated directly.
+"""
+
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+
+if self.use_gpu:
+if not self.local_mode:
+key = "spark.task.resource.gpu.amount"
+task_gpu_amount = int(self.sc.getConf().get(key, "0"))
+if task_gpu_amount < 1:
+raise RuntimeError(f"'{key}' was unset, so gpu usage is 
unavailable.")
+# TODO(SPARK-41916): Address situation when 
spark.task.resource.gpu.amount > 1
+return math.ceil(self.num_processes / task_gpu_amount)
+else:
+key = "spark.driver.resource.gpu.amount"
+if "gpu" not in self.sc.resources:
+raise RuntimeError("GPUs were unable to be found on the 
driver.")
+num_available_gpus = int(self.sc.getConf().get(key, "0"))

Review Comment:
   Sure, will add that check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063123902


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:

Review Comment:
   I believe `bool("false")` would return `True` though since bool doesn't 
actually attempt to "comprehend" the string value, right? I guess we could do 
`return sc.getConf().get(key, default_value) == "true"` but that would treat 
all possible other values here as "false." I guess let's wait for @HyukjinKwon 
to give his thoughts on the matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063118380


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+"""
+The parent class for TorchDistributor. This class shouldn't be 
instantiated directly.
+"""
+
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+
+if self.use_gpu:
+if not self.local_mode:
+key = "spark.task.resource.gpu.amount"
+task_gpu_amount = int(self.sc.getConf().get(key, "0"))
+if task_gpu_amount < 1:
+raise RuntimeError(f"'{key}' was unset, so gpu usage is 
unavailable.")
+# TODO(SPARK-41916): Address situation when 
spark.task.resource.gpu.amount > 1
+return math.ceil(self.num_processes / task_gpu_amount)
+else:
+key = "spark.driver.resource.gpu.amount"
+if "gpu" not in self.sc.resources:
+raise RuntimeError("GPUs were unable to be found on the 
driver.")
+num_available_gpus = int(self.sc.getConf().get(key, "0"))
+if self.num_processes > num_available_gpus:
+raise ValueError(
+f"For local training, {self.num_processes} can be at 
most"
+f"equal to the amount of GPUs available,"
+f"which is {num_available_gpus}."
+)
+return self.num_processes
+
+def _validate_input_params(self) -> None:
+if self.num_processes <= 0:
+raise ValueError("num_proccesses has to be a positive integer")
+
+def _check_encryption(self) -> None:
+"""Checks to see if the user requires encrpytion of data.
+If required, throw an exception since we don't support that.
+
+Raises
+--
+NotImplementedError
+   

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063117818


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+"""
+The parent class for TorchDistributor. This class shouldn't be 
instantiated directly.
+"""
+
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+
+if self.use_gpu:
+if not self.local_mode:
+key = "spark.task.resource.gpu.amount"
+task_gpu_amount = int(self.sc.getConf().get(key, "0"))
+if task_gpu_amount < 1:
+raise RuntimeError(f"'{key}' was unset, so gpu usage is 
unavailable.")
+# TODO(SPARK-41916): Address situation when 
spark.task.resource.gpu.amount > 1
+return math.ceil(self.num_processes / task_gpu_amount)
+else:
+key = "spark.driver.resource.gpu.amount"
+if "gpu" not in self.sc.resources:
+raise RuntimeError("GPUs were unable to be found on the 
driver.")
+num_available_gpus = int(self.sc.getConf().get(key, "0"))
+if self.num_processes > num_available_gpus:
+raise ValueError(

Review Comment:
   Sure, that seems reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063017746


##
python/pyspark/ml/torch/tests/test_distributor.py:
##
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# from pyspark.testing.sqlutils import ReusedSQLTestCase
+import os
+from pyspark import SparkConf, SparkContext
+from pyspark.ml.torch.distributor import TorchDistributor
+from pyspark.sql import SparkSession
+from pyspark.testing.utils import SPARK_HOME
+import stat
+import tempfile
+import unittest
+
+
+class TorchDistributorBaselineUnitTests(unittest.TestCase):

Review Comment:
   No, all tests will pass regardless of whether `pytorch` is installed until 
we get create a PR for https://issues.apache.org/jira/browse/SPARK-41777 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063014909


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(

Review Comment:
   Will need to be `RuntimeError` I believe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063014563


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+"""
+The parent class for TorchDistributor. This class shouldn't be 
instantiated directly.
+"""
+
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+
+if self.use_gpu:
+if not self.local_mode:
+key = "spark.task.resource.gpu.amount"
+task_gpu_amount = int(self.sc.getConf().get(key, "0"))
+if task_gpu_amount < 1:
+raise RuntimeError(f"'{key}' was unset, so gpu usage is 
unavailable.")
+# TODO(SPARK-41916): Address situation when 
spark.task.resource.gpu.amount > 1
+return math.ceil(self.num_processes / task_gpu_amount)
+else:
+key = "spark.driver.resource.gpu.amount"
+if "gpu" not in self.sc.resources:
+raise RuntimeError("GPUs were unable to be found on the 
driver.")
+num_available_gpus = int(self.sc.getConf().get(key, "0"))
+if self.num_processes > num_available_gpus:
+raise ValueError(
+f"For local training, {self.num_processes} can be at 
most"
+f"equal to the amount of GPUs available,"
+f"which is {num_available_gpus}."
+)
+return self.num_processes
+
+def _validate_input_params(self) -> None:
+if self.num_processes <= 0:
+raise ValueError("num_proccesses has to be a positive integer")
+
+def _check_encryption(self) -> None:
+"""Checks to see if the user requires encrpytion of data.
+If required, throw an exception since we don't support that.
+
+Raises
+--
+NotImplementedError
+   

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063014363


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+"""
+The parent class for TorchDistributor. This class shouldn't be 
instantiated directly.
+"""
+
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+
+if self.use_gpu:
+if not self.local_mode:
+key = "spark.task.resource.gpu.amount"
+task_gpu_amount = int(self.sc.getConf().get(key, "0"))
+if task_gpu_amount < 1:
+raise RuntimeError(f"'{key}' was unset, so gpu usage is 
unavailable.")
+# TODO(SPARK-41916): Address situation when 
spark.task.resource.gpu.amount > 1
+return math.ceil(self.num_processes / task_gpu_amount)
+else:
+key = "spark.driver.resource.gpu.amount"
+if "gpu" not in self.sc.resources:
+raise RuntimeError("GPUs were unable to be found on the 
driver.")
+num_available_gpus = int(self.sc.getConf().get(key, "0"))
+if self.num_processes > num_available_gpus:
+raise ValueError(
+f"For local training, {self.num_processes} can be at 
most"
+f"equal to the amount of GPUs available,"
+f"which is {num_available_gpus}."
+)
+return self.num_processes
+
+def _validate_input_params(self) -> None:
+if self.num_processes <= 0:
+raise ValueError("num_proccesses has to be a positive integer")
+
+def _check_encryption(self) -> None:
+"""Checks to see if the user requires encrpytion of data.
+If required, throw an exception since we don't support that.
+
+Raises
+--
+NotImplementedError
+   

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063008896


##
python/pyspark/ml/torch/tests/test_distributor.py:
##
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# from pyspark.testing.sqlutils import ReusedSQLTestCase
+import os
+from pyspark import SparkConf, SparkContext
+from pyspark.ml.torch.distributor import TorchDistributor
+from pyspark.sql import SparkSession
+from pyspark.testing.utils import SPARK_HOME
+import stat
+import tempfile
+import unittest
+
+
+class TorchDistributorBaselineUnitTests(unittest.TestCase):
+def setUp(self) -> None:
+conf = SparkConf()
+self.sc = SparkContext("local[4]", conf=conf)
+self.spark = SparkSession(self.sc)
+
+def tearDown(self) -> None:
+self.spark.stop()
+self.sc.stop()
+
+def test_validate_correct_inputs(self) -> None:
+inputs = [
+("pytorch", 1, True, False),
+("pytorch", 100, True, False),
+("pytorch-lightning", 1, False, False),
+("pytorch-lightning", 100, False, False),
+]
+for framework, num_processes, local_mode, use_gpu in inputs:
+with self.subTest():
+TorchDistributor(framework, num_processes, local_mode, use_gpu)
+
+def test_validate_incorrect_inputs(self) -> None:
+inputs = [
+("tensorflow", 1, True, False, ValueError, "framework"),
+("pytroch", 100, True, False, ValueError, "framework"),
+("pytorchlightning", 1, False, False, ValueError, "framework"),
+("pytorch-lightning", 0, False, False, ValueError, "positive"),
+]
+for framework, num_processes, local_mode, use_gpu, error, message in 
inputs:
+with self.subTest():
+with self.assertRaisesRegex(error, message):
+TorchDistributor(framework, num_processes, local_mode, 
use_gpu)
+
+def test_encryption_passes(self) -> None:
+inputs = [
+("spark.ssl.enabled", "false", 
"pytorch.spark.distributor.ignoreSsl", "true"),
+("spark.ssl.enabled", "false", 
"pytorch.spark.distributor.ignoreSsl", "false"),
+("spark.ssl.enabled", "true", 
"pytorch.spark.distributor.ignoreSsl", "true"),
+]
+for ssl_conf_key, ssl_conf_value, pytorch_conf_key, pytorch_conf_value 
in inputs:
+with self.subTest():
+self.spark.sparkContext._conf.set(ssl_conf_key, ssl_conf_value)
+self.spark.sparkContext._conf.set(pytorch_conf_key, 
pytorch_conf_value)
+distributor = TorchDistributor("pytorch", 1, True, False)
+distributor._check_encryption()
+
+def test_encryption_fails(self) -> None:
+# this is the only combination that should fail
+inputs = [("spark.ssl.enabled", "true", 
"pytorch.spark.distributor.ignoreSsl", "false")]
+for ssl_conf_key, ssl_conf_value, pytorch_conf_key, pytorch_conf_value 
in inputs:
+with self.subTest():
+with self.assertRaisesRegex(Exception, "encryption"):
+self.spark.sparkContext._conf.set(ssl_conf_key, 
ssl_conf_value)
+self.spark.sparkContext._conf.set(pytorch_conf_key, 
pytorch_conf_value)
+distributor = TorchDistributor("pytorch", 1, True, False)
+distributor._check_encryption()
+
+def test_get_num_tasks_fails(self) -> None:
+inputs = [1, 5, 4]
+
+# This is when the conf isn't set and we request GPUs
+for num_processes in inputs:
+with self.subTest():
+with self.assertRaisesRegex(RuntimeError, "driver"):
+TorchDistributor("pytorch", num_processes, True, True)
+with self.assertRaisesRegex(RuntimeError, "unset"):
+TorchDistributor("pytorch", num_processes, False, True)
+
+
+class TorchDistributorLocalUnitTests(unittest.TestCase):

Review Comment:
   More tests will be added to this class and the following class in later PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please 

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1063007162


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,287 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# Moved the util functions to this file for now
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+if self.use_gpu:
+key = "spark.task.resource.gpu.amount"
+if self.sc.getConf().contains(key):
+if gpu_amount_raw := self.sc.getConf().get(key):  # mypy 
error??
+task_gpu_amount = int(gpu_amount_raw)
+else:
+task_gpu_amount = 1  # for single node clusters

Review Comment:
   @WeichenXu123 @lu-wang-dl, I added an updated check. Can you please let me 
know if this looks more reasonable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-05 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1062956832


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,287 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# Moved the util functions to this file for now
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+if self.use_gpu:
+key = "spark.task.resource.gpu.amount"
+if self.sc.getConf().contains(key):
+if gpu_amount_raw := self.sc.getConf().get(key):  # mypy 
error??
+task_gpu_amount = int(gpu_amount_raw)
+else:
+task_gpu_amount = 1  # for single node clusters
+if task_gpu_amount < 1:
+raise ValueError(
+f"The Spark conf `{key}` has a value "
+f"of {task_gpu_amount} but it "
+"should not have a value less than 1."
+)
+return math.ceil(self.num_processes / task_gpu_amount)

Review Comment:
   https://issues.apache.org/jira/browse/SPARK-41916



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2023-01-04 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1061990731


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,287 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# Moved the util functions to this file for now
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+if self.use_gpu:
+key = "spark.task.resource.gpu.amount"
+if self.sc.getConf().contains(key):
+if gpu_amount_raw := self.sc.getConf().get(key):  # mypy 
error??
+task_gpu_amount = int(gpu_amount_raw)
+else:
+task_gpu_amount = 1  # for single node clusters
+if task_gpu_amount < 1:
+raise ValueError(
+f"The Spark conf `{key}` has a value "
+f"of {task_gpu_amount} but it "
+"should not have a value less than 1."
+)
+return math.ceil(self.num_processes / task_gpu_amount)
+return self.num_processes
+
+def _validate_input_params(self) -> None:
+if self.num_processes <= 0:
+raise ValueError("num_proccesses has to be a positive integer")
+
+def _check_encryption(self) -> None:
+"""Checks to see if the user requires encrpytion of data.
+If required, throw an exception since we don't support that.
+
+Raises
+--
+NotImplementedError
+Thrown when the user doesn't use PyTorchDistributor
+Exception
+Thrown when the user requires ssl encryption
+"""
+if not "ssl_conf":
+raise Exception(
+"Distributor doesn't have this functionality. Use 
PyTorchDistributor instead."
+)
+is_ssl_enabled = get_conf_boolean(self.sc, "spark.ssl.enabled", 
"false")
+ignore_ssl = get_conf_boolean(self.sc, self.ssl_conf, "false")  # 
type: ignore
+  

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-29 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1059095881


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,287 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# Moved the util functions to this file for now
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+if self.use_gpu:
+key = "spark.task.resource.gpu.amount"
+if self.sc.getConf().contains(key):
+if gpu_amount_raw := self.sc.getConf().get(key):  # mypy 
error??
+task_gpu_amount = int(gpu_amount_raw)
+else:
+task_gpu_amount = 1  # for single node clusters
+if task_gpu_amount < 1:
+raise ValueError(
+f"The Spark conf `{key}` has a value "
+f"of {task_gpu_amount} but it "
+"should not have a value less than 1."
+)
+return math.ceil(self.num_processes / task_gpu_amount)

Review Comment:
   The few situations:
   1. spark.task.resource.gpu.amount > 1 + user code can use 1 gpu core: This 
is fine, since for torchrun, we could do `torchrun 
nproc_per_node=task_gpu_amount ...` as the command to be executed. This will 
launch multiple training processes per task. This will need some additional 
work toward the end, so I will create this as a backlog ticket.
   2. spark.task.resource.gpu.amount = 1 + user code can use > 1 gpu core: This 
means using 
[model-parallel](https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html)
 instead of 
[distributed-data-parallel](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html).
 It seems that currently, distributed training only supports data parallel and 
not model parallel so users would hopefully know not to use model parallel. We 
could log this as well though.



-- 
This is an automated message from the Apache Git Service.
To respond 

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-29 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1059095881


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,287 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# Moved the util functions to this file for now
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+if self.use_gpu:
+key = "spark.task.resource.gpu.amount"
+if self.sc.getConf().contains(key):
+if gpu_amount_raw := self.sc.getConf().get(key):  # mypy 
error??
+task_gpu_amount = int(gpu_amount_raw)
+else:
+task_gpu_amount = 1  # for single node clusters
+if task_gpu_amount < 1:
+raise ValueError(
+f"The Spark conf `{key}` has a value "
+f"of {task_gpu_amount} but it "
+"should not have a value less than 1."
+)
+return math.ceil(self.num_processes / task_gpu_amount)

Review Comment:
   The few situations:
   1. spark.task.resource.gpu.amount > 1 + user code can use 1 gpu core: This 
is fine, since for torchrun, we could do `torchrun 
nproc_per_node=task_gpu_amount ...` as the command to be executed. This will 
launch multiple training processes per task.
   2. spark.task.resource.gpu.amount = 1 + user code can use > 1 gpu core: This 
means using 
[model-parallel](https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html)
 instead of 
[distributed-data-parallel](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html).
 It seems that currently, distributed training only supports data parallel and 
not model parallel so users would hopefully know not to use model parallel. We 
could log this as well though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-29 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1059095881


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,287 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# Moved the util functions to this file for now
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+if self.use_gpu:
+key = "spark.task.resource.gpu.amount"
+if self.sc.getConf().contains(key):
+if gpu_amount_raw := self.sc.getConf().get(key):  # mypy 
error??
+task_gpu_amount = int(gpu_amount_raw)
+else:
+task_gpu_amount = 1  # for single node clusters
+if task_gpu_amount < 1:
+raise ValueError(
+f"The Spark conf `{key}` has a value "
+f"of {task_gpu_amount} but it "
+"should not have a value less than 1."
+)
+return math.ceil(self.num_processes / task_gpu_amount)

Review Comment:
   The few situations:
   1. spark.task.resource.gpu.amount > 1 + user code can use 1 gpu core: This 
is fine, since for torchrun, we could do `torchrun 
nproc_per_node=task_gpu_amount ...` as the command to be executed. This will 
launch multiple training processes per executor.
   2. spark.task.resource.gpu.amount = 1 + user code can use > 1 gpu core: This 
means using 
[model-parallel](https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html)
 instead of 
[distributed-data-parallel](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html).
 It seems that currently, distributed training only supports data parallel and 
not model parallel so users would hopefully know not to use model parallel. We 
could log this as well though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-29 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1059095881


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,287 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional, Any
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+
+# Moved the util functions to this file for now
+# TODO(SPARK-41589): will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str) -> bool:
+"""Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+
+Parameters
+--
+sc : SparkContext
+The SparkContext for the distributor.
+key : str
+string for conf name
+default_value : str
+default value for the conf value for the given key
+
+Returns
+---
+bool
+Returns the boolean value that corresponds to the conf
+
+Raises
+--
+Exception
+Thrown when the conf value is not a boolean
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+
+class Distributor:
+def __init__(
+self,
+num_processes: int = 1,
+local_mode: bool = True,
+use_gpu: bool = True,
+spark: Optional[SparkSession] = None,
+):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+self.ssl_conf = None
+
+def _get_num_tasks(self) -> int:
+"""
+Returns the number of Spark tasks to use for distributed training
+
+Returns
+---
+The number of Spark tasks to use for distributed training
+"""
+if self.use_gpu:
+key = "spark.task.resource.gpu.amount"
+if self.sc.getConf().contains(key):
+if gpu_amount_raw := self.sc.getConf().get(key):  # mypy 
error??
+task_gpu_amount = int(gpu_amount_raw)
+else:
+task_gpu_amount = 1  # for single node clusters
+if task_gpu_amount < 1:
+raise ValueError(
+f"The Spark conf `{key}` has a value "
+f"of {task_gpu_amount} but it "
+"should not have a value less than 1."
+)
+return math.ceil(self.num_processes / task_gpu_amount)

Review Comment:
   The few situations:
   1. spark.task.resource.gpu.amount > 1 + user code can use 1 gpu core: 1 gpu 
core is used for training so this will waste resources. We could log this so 
the user will be aware of that fact.
   2. spark.task.resource.gpu.amount = 1 + user code can use > 1 gpu core: This 
means using 
[model-parallel](https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html)
 instead of 
[distributed-data-parallel](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html).
 It seems that currently, distributed training only supports data parallel and 
not model parallel so users would hopefully know not to use model parallel. We 
could log this as well though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-23 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1056653132


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,179 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+from typing import Union, Callable, Optional
+import warnings
+
+from pyspark.sql import SparkSession
+from pyspark.context import SparkContext
+
+# Moved the util functions to this file for now
+# TODO: will move the functions and tests to an external file
+#   once we are in agreement about which functions should be in utils.py
+def get_conf_boolean(sc: SparkContext, key: str, default_value: str):
+"""
+Get the conf "key" from the given spark context,
+or return the default value if the conf is not set.
+This expects the conf value to be a boolean or string;
+if the value is a string, this checks for all capitalization
+patterns of "true" and "false" to match Scala.
+Args:
+key: string for conf name
+default_value: default value for the conf value for the given key
+"""
+val = sc.getConf().get(key, default_value)
+lowercase_val = val.lower()
+if lowercase_val == "true":
+return True
+if lowercase_val == "false":
+return False
+raise Exception(
+"get_conf_boolean expected a boolean conf "
+"value but found value of type {} "
+"with value: {}".format(type(val), val)
+)
+
+# might need to move into its own file as we look forward.
+class Distributor:
+def __init__(self, num_processes: int = 1, local_mode: bool = True, 
use_gpu: bool = True, spark: Optional[SparkSession] = None):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+if spark:
+self.spark = spark
+else:
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+
+def _get_num_tasks(self):
+"""
+Returns:
+The number of Spark tasks to use for distributed training
+"""
+if self.use_gpu:
+key = "spark.task.resource.gpu.amount"
+if self.sc.getConf().contains(key):
+task_gpu_amount = int(self.sc.getConf().get(key))
+else:
+task_gpu_amount = 1 # for single node clusters
+if task_gpu_amount < 1:
+raise ValueError(
+f"The Spark conf `{key}` has a value "
+f"of {task_gpu_amount} but it "
+"should not have a value less than 1."
+)
+return math.ceil(self.num_processes / task_gpu_amount)
+return self.num_processes
+
+def _validate_input_params(self):
+if self.num_processes <= 0:
+raise ValueError(f"num_proccesses has to be a positive integer")
+
+def _check_encryption(self):
+if "ssl_conf" not in self.__dict__:
+raise NotImplementedError()
+is_ssl_enabled = get_conf_boolean(
+self.sc,
+"spark.ssl.enabled",
+"false"
+)
+ignore_ssl = get_conf_boolean(
+self.sc,
+self.ssl_conf,
+"false"
+)
+if is_ssl_enabled:
+name = self.__class__.__name__
+if ignore_ssl:
+warnings.warn(
+f"""
+This cluster has TLS encryption enabled;
+however, {name} does not
+support data encryption in transit.
+The Spark configuration
+'{self.ssl_conf}' has been set to
+'true' to override this
+configuration and use {name} anyway. Please
+note this will cause model
+parameters and possibly training data to
+be sent between nodes unencrypted.
+""",
+RuntimeWarning
+)
+return
+ 

[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-23 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1056647526


##
python/pyspark/ml/torch/tests/test_distributor.py:
##
@@ -0,0 +1,82 @@
+from pyspark.testing.utils import PySparkTestCase
+from pyspark.ml.torch.distributor import PyTorchDistributor
+from pyspark.sql import SparkSession
+
+# Q: Do you recommend that I use pytest.mark.parametrize? It doesn't seem to 
be used elsewhere in this code...
+class TestPyTorchDistributor(PySparkTestCase):

Review Comment:
   I want to start and stop a SparkSession instance after each test, not when 
the class starts and ends. Correct me if I am wrong, but `ReusedSQLTestCase` 
seems to do the setup and teardown only when a class starts and ends, not 
during each individual test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-22 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1055806728


##
python/pyspark/ml/torch/utils.py:
##
@@ -0,0 +1,24 @@
+from pyspark.context import SparkContext

Review Comment:
   I will actually delete this file for now since I don't expect a lot of utils 
just yet and if we do, we can move it out to `utils.py` at a later time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-22 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1055794843


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from .utils import get_conf_boolean
+import math
+from typing import Union, Callable
+from pyspark.sql import SparkSession
+
+
+# might need to move into its own file as we look forward.
+class Distributor:
+def __init__(self, num_processes: int = 1, local_mode: bool = True, 
use_gpu: bool = True):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+self.spark = SparkSession.builder.getOrCreate()

Review Comment:
   There actually might be (specificially when users want to set 
`spark.task.resource.gpu.amount > 1`), let me fix this by adding a new 
parameter as input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-22 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1055794843


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from .utils import get_conf_boolean
+import math
+from typing import Union, Callable
+from pyspark.sql import SparkSession
+
+
+# might need to move into its own file as we look forward.
+class Distributor:
+def __init__(self, num_processes: int = 1, local_mode: bool = True, 
use_gpu: bool = True):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+self.spark = SparkSession.builder.getOrCreate()

Review Comment:
   There actually might be, let me fix this by adding a new parameter as input



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-20 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1053894446


##
python/pyspark/ml/torch/tests/test_distributor.py:
##
@@ -0,0 +1,82 @@
+from pyspark.testing.utils import PySparkTestCase
+from pyspark.ml.torch.distributor import PyTorchDistributor
+from pyspark.sql import SparkSession
+
+# Q: Do you recommend that I use pytest.mark.parametrize? It doesn't seem to 
be used elsewhere in this code...
+class TestPyTorchDistributor(PySparkTestCase):
+
+def setUp(self):
+super().setUp()
+self.spark = SparkSession(self.sc)
+
+def tearDown(self):
+super().tearDown()
+
+def test_validate_correct_inputs(self):

Review Comment:
   What is the standard for testing multiple inputs on one function. I didn't 
find any uses of `parametrize` so is there an alternative approach? I saw that 
`self.subTest()` has been used in the past so started off with that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-20 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1053893899


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from .utils import get_conf_boolean
+import math
+from typing import Union, Callable
+from pyspark.sql import SparkSession
+
+
+# might need to move into its own file as we look forward.
+class Distributor:
+def __init__(self, num_processes: int = 1, local_mode: bool = True, 
use_gpu: bool = True):
+self.num_processes = num_processes
+self.local_mode = local_mode
+self.use_gpu = use_gpu
+self.spark = SparkSession.builder.getOrCreate()
+self.sc = self.spark.sparkContext
+self.num_tasks = self._get_num_tasks()
+
+def _get_num_tasks(self):
+"""
+Returns:
+The number of Spark tasks to use for distributed training
+"""
+if self.use_gpu:
+key = "spark.task.resource.gpu.amount"
+if self.sc.getConf().contains(key):
+task_gpu_amount = int(self.sc.getConf().get(key))
+else:
+task_gpu_amount = 1 # for single node clusters
+if task_gpu_amount < 1:
+raise ValueError(
+f"The Spark conf `{key}` has a value "
+f"of {task_gpu_amount} but it "
+"should not have a value less than 1."
+)
+return math.ceil(self.num_processes / task_gpu_amount)
+return self.num_processes
+
+def _validate_input_params(self):
+if self.num_processes <= 0:
+raise ValueError(f"num_proccesses has to be a positive integer")
+
+# can be required for TF distributor in the future as well
+def _check_encryption(self):
+if "ssl_conf" not in self.__dict__:
+raise NotImplementedError()
+is_ssl_enabled = get_conf_boolean(
+self.sc,
+"spark.ssl.enabled",
+"false"
+)
+ignore_ssl = get_conf_boolean(
+self.sc,
+self.ssl_conf,
+"false"
+)
+if is_ssl_enabled:
+name = self.__class__.__name__
+if ignore_ssl:
+print(

Review Comment:
   There will be a logger in the next PR. For now, just leaving it as a print() 
function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rithwik-db commented on a diff in pull request #39146: [WIP][SPARK-41589][PYTHON][ML] PyTorch Distributor Baseline API Changes

2022-12-20 Thread GitBox


rithwik-db commented on code in PR #39146:
URL: https://github.com/apache/spark/pull/39146#discussion_r1053893575


##
python/pyspark/ml/torch/distributor.py:
##
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from .utils import get_conf_boolean
+import math
+from typing import Union, Callable
+from pyspark.sql import SparkSession
+
+
+# might need to move into its own file as we look forward.
+class Distributor:

Review Comment:
   This can be the parent class for TensorflowDistributor down the line. Just 
to clarify, for this specific project, the TensorflowDistributor is out of 
scope, but that is something that we ideally want to add in the future so just 
doing some preliminary work to make life easier down the line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org