[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21092 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r194117385 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala --- @@ -88,15 +90,42 @@ class KubernetesConfSuite extends SparkFunSuite { APP_NAME, RESOURCE_NAME_PREFIX, APP_ID, - None, + mainAppResource = None, MAIN_CLASS, - APP_ARGS) + APP_ARGS, + maybePyFiles = None) assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",") === Array("local:///opt/spark/jar1.jar")) + assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1) } - test("Resolve driver labels, annotations, secret mount paths, and envs.") { + test("Creating driver conf with a python primary file") { +val mainResourceFile = "local:///opt/spark/main.py" +val inputPyFiles = Array("local:///opt/spark/example2.py", "local:///example3.py") val sparkConf = new SparkConf(false) + .setJars(Seq("local:///opt/spark/jar1.jar")) + .set("spark.files", "local:///opt/spark/example4.py") +val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) +val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( + sparkConf, + APP_NAME, + RESOURCE_NAME_PREFIX, + APP_ID, + mainAppResource, + MAIN_CLASS, + APP_ARGS, + Some(inputPyFiles.mkString(","))) + assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") + === Array("local:///opt/spark/jar1.jar")) + assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r194109752 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala --- @@ -87,11 +89,37 @@ class KubernetesConfSuite extends SparkFunSuite { APP_ID, None, MAIN_CLASS, - APP_ARGS) + APP_ARGS, + None) assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",") === Array("local:///opt/spark/jar1.jar")) + assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1) } + test("Creating driver conf with a python primary file") { --- End diff -- Just a follow up we should have a test for with Python and overriding MEMORY_OVERHEAD_FACTOR (e.g. test to make sure that setIfMissing since we had it the other way earlier in the PR). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r194112328 --- Diff: docs/running-on-kubernetes.md --- @@ -624,4 +624,20 @@ specific to Spark on Kubernetes. spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key. + + spark.kubernetes.memoryOverheadFactor + 0.1 + +This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs. --- End diff -- I think we can maybe improve this documentation a little bit. It's not so much how much memory is set aside for non-JVM jobs, it's how much memory is set aside for non-JVM memory, including off-heap allocations, non-JVM jobs (like Python or R), and system processes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r194113403 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala --- @@ -88,15 +90,42 @@ class KubernetesConfSuite extends SparkFunSuite { APP_NAME, RESOURCE_NAME_PREFIX, APP_ID, - None, + mainAppResource = None, MAIN_CLASS, - APP_ARGS) + APP_ARGS, + maybePyFiles = None) assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",") === Array("local:///opt/spark/jar1.jar")) + assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1) } - test("Resolve driver labels, annotations, secret mount paths, and envs.") { + test("Creating driver conf with a python primary file") { +val mainResourceFile = "local:///opt/spark/main.py" +val inputPyFiles = Array("local:///opt/spark/example2.py", "local:///example3.py") val sparkConf = new SparkConf(false) + .setJars(Seq("local:///opt/spark/jar1.jar")) + .set("spark.files", "local:///opt/spark/example4.py") +val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) +val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( + sparkConf, + APP_NAME, + RESOURCE_NAME_PREFIX, + APP_ID, + mainAppResource, + MAIN_CLASS, + APP_ARGS, + Some(inputPyFiles.mkString(","))) + assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") + === Array("local:///opt/spark/jar1.jar")) + assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) --- End diff -- Just as we discussed earlier testing this value explicitly configured with Python would be good to have as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r194107584 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -154,6 +176,24 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val MEMORY_OVERHEAD_FACTOR = +ConfigBuilder("spark.kubernetes.memoryOverheadFactor") + .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " + +"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs") + .doubleConf + .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1, +"Ensure that memory overhead is a double between 0 --> 1.0") + .createWithDefault(0.1) + + val PYSPARK_MAJOR_PYTHON_VERSION = +ConfigBuilder("spark.kubernetes.pyspark.pythonversion") + .doc("This sets the python version. Either 2 or 3. (Python2 or Python3)") + .stringConf + .checkValue(pv => List("2", "3").contains(pv), +"Ensure that Python Version is either Python2 or Python3") + .createWithDefault("2") --- End diff -- I'm fine with either as the default. While Py2 is officially EOL I think we'll still see PySpark Py2 apps for awhile after. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r193913624 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -154,6 +176,24 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val MEMORY_OVERHEAD_FACTOR = +ConfigBuilder("spark.kubernetes.memoryOverheadFactor") + .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " + +"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs") + .doubleConf + .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1, +"Ensure that memory overhead is a double between 0 --> 1.0") + .createWithDefault(0.1) + + val PYSPARK_MAJOR_PYTHON_VERSION = +ConfigBuilder("spark.kubernetes.pyspark.pythonversion") + .doc("This sets the python version. Either 2 or 3. (Python2 or Python3)") + .stringConf + .checkValue(pv => List("2", "3").contains(pv), +"Ensure that Python Version is either Python2 or Python3") + .createWithDefault("2") --- End diff -- I am willing to do that: thoughts @holdenk ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user kokes commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r193805391 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -154,6 +176,24 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val MEMORY_OVERHEAD_FACTOR = +ConfigBuilder("spark.kubernetes.memoryOverheadFactor") + .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " + +"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs") + .doubleConf + .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1, +"Ensure that memory overhead is a double between 0 --> 1.0") + .createWithDefault(0.1) + + val PYSPARK_MAJOR_PYTHON_VERSION = +ConfigBuilder("spark.kubernetes.pyspark.pythonversion") + .doc("This sets the python version. Either 2 or 3. (Python2 or Python3)") + .stringConf + .checkValue(pv => List("2", "3").contains(pv), +"Ensure that Python Version is either Python2 or Python3") + .createWithDefault("2") --- End diff -- There is only ~18 months of support left for Python 2. Python 3 has been around for 10 years and unless thereâs a good reason, I think it should be the default. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r193801841 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -102,17 +110,30 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +// The function of this outer match is to account for multiple nonJVM +// bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4 +case nonJVM: NonJVMResource => + nonJVM match { +case PythonMainAppResource(res) => + additionalFiles += res + maybePyFiles.foreach{maybePyFiles => +additionalFiles.appendAll(maybePyFiles.split(","))} + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) + } + sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4) --- End diff -- Users have an ability to overwrite. Hence the change to `setIfMissing()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r193796798 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -154,6 +176,24 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val MEMORY_OVERHEAD_FACTOR = +ConfigBuilder("spark.kubernetes.memoryOverheadFactor") + .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " + +"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs") + .doubleConf + .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1, +"Ensure that memory overhead is a double between 0 --> 1.0") + .createWithDefault(0.1) + + val PYSPARK_MAJOR_PYTHON_VERSION = +ConfigBuilder("spark.kubernetes.pyspark.pythonversion") + .doc("This sets the python version. Either 2 or 3. (Python2 or Python3)") + .stringConf + .checkValue(pv => List("2", "3").contains(pv), +"Ensure that Python Version is either Python2 or Python3") + .createWithDefault("2") --- End diff -- No particular reason. I just thought that the major version should default to 2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user kokes commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r193639554 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -154,6 +176,24 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val MEMORY_OVERHEAD_FACTOR = +ConfigBuilder("spark.kubernetes.memoryOverheadFactor") + .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " + +"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs") + .doubleConf + .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1, +"Ensure that memory overhead is a double between 0 --> 1.0") + .createWithDefault(0.1) + + val PYSPARK_MAJOR_PYTHON_VERSION = +ConfigBuilder("spark.kubernetes.pyspark.pythonversion") + .doc("This sets the python version. Either 2 or 3. (Python2 or Python3)") + .stringConf + .checkValue(pv => List("2", "3").contains(pv), +"Ensure that Python Version is either Python2 or Python3") + .createWithDefault("2") --- End diff -- Am I reading this right that the default is Python 2? Is there a reason for that? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r192457908 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -102,17 +110,30 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +// The function of this outer match is to account for multiple nonJVM +// bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4 +case nonJVM: NonJVMResource => + nonJVM match { +case PythonMainAppResource(res) => + additionalFiles += res + maybePyFiles.foreach{maybePyFiles => +additionalFiles.appendAll(maybePyFiles.split(","))} + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) + } + sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4) --- End diff -- I think that power users would want the ability to try to overwrite this if they have a specific amount of memory overhead that they want and know that they need. Configurations should always be configurable, with defaults that are sane. I agree that we can afford to set a better default for Kubernetes, but there should always be a way to override default settings if the user knows the characteristics of their job. For example if the user does memory profiling of their container and sees that it's not using the full amount of memory, they can afford to drop this value and leave more resources for other applications. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r192448946 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -102,17 +110,30 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +// The function of this outer match is to account for multiple nonJVM +// bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4 +case nonJVM: NonJVMResource => + nonJVM match { +case PythonMainAppResource(res) => + additionalFiles += res + maybePyFiles.foreach{maybePyFiles => +additionalFiles.appendAll(maybePyFiles.split(","))} + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) + } + sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4) --- End diff -- Yup, you can see my statement about not overriding the explicitly user provided value in comment on the 20th ("if the user has specified a different value don't think we should override it"). So this logic, as it stands, is K8s specific and I don't think we we can change how YARN chooses its memory overhead in a minor release, so I'd expect this to remain K8s specific until at least 3.0 when we can evaluate if we want to change this in YARN as well. The memory overhead configuration notice done in the YARN page right now (see `spark.yarn.am.memoryOverhead` on http://spark.apache.org/docs/latest/running-on-yarn.html ). So I would document this in http://spark.apache.org/docs/latest/running-on-kubernetes.html#spark-properties e.g. `./docs/running-on-kubernetes.md`). As for intuitive I'd argue that this actually is more intuitive than what we do in YARN, we know that users who run R & Python need more non-JVM heap space and many users don't know to think about this until their job fails. We can take advantage of our knowledge to handle this setting for the user more often. You can see how often this confuses folks on the list, docs, and stack overflow by looking at "memory overhead exceeded" and "Container killed by YARN for exceeding memory limits" and similar. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r192270208 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -102,17 +110,30 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +// The function of this outer match is to account for multiple nonJVM +// bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4 +case nonJVM: NonJVMResource => + nonJVM match { +case PythonMainAppResource(res) => + additionalFiles += res + maybePyFiles.foreach{maybePyFiles => +additionalFiles.appendAll(maybePyFiles.split(","))} + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) + } + sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4) --- End diff -- Also you probably only want to `setIfMissing`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r192270082 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -102,17 +110,30 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +// The function of this outer match is to account for multiple nonJVM +// bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4 +case nonJVM: NonJVMResource => + nonJVM match { +case PythonMainAppResource(res) => + additionalFiles += res + maybePyFiles.foreach{maybePyFiles => +additionalFiles.appendAll(maybePyFiles.split(","))} + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) + } + sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4) --- End diff -- @holdenk this behavior isn't intuitive, that the memory overhead factor default will be calculated differently depending on what language binding the job is running with. Is there a good page in Spark's configuration documentation on https://spark.apache.org/docs/latest/ where this should be documented? Is this logic Kubernetes specific? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r192245644 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile --- @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG base_img +FROM $base_img +WORKDIR / +COPY python /opt/spark/python +RUN apk add --no-cache python && \ +python -m ensurepip && \ +rm -r /usr/lib/python*/ensurepip && \ +pip install --upgrade pip setuptools && \ +rm -r /root/.cache --- End diff -- Yes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r192245626 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile --- @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG base_img +FROM $base_img +WORKDIR / +RUN mkdir ${SPARK_HOME}/python +COPY python/lib ${SPARK_HOME}/python/lib +RUN apk add --no-cache python && \ +apk add --no-cache python3 && \ +python -m ensurepip && \ +python3 -m ensurepip && \ +rm -r /usr/lib/python*/ensurepip && \ +pip install --upgrade pip setuptools && \ --- End diff -- I will include pip3.6 installation for now until we figure out a long-term venv solution in the next PR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r192241796 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -48,7 +48,8 @@ private[spark] class BasicDriverFeatureStep( private val driverMemoryMiB = conf.get(DRIVER_MEMORY) private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) -.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) +.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * driverMemoryMiB).toInt, --- End diff -- Isn't it easier to set it here as such, since it should default to `0.1` unless the non-JVM check modifies it to`0.4`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r192241158 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh --- @@ -53,6 +53,28 @@ if [ -n "$SPARK_MOUNTED_FILES_DIR" ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." . fi +if [ -n "$PYSPARK_FILES" ]; then +PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES" +fi + +PYSPARK_ARGS="" +if [ -n "$PYSPARK_APP_ARGS" ]; then +PYSPARK_ARGS="$PYSPARK_APP_ARGS" +fi + + +if [ "$PYSPARK_PYTHON_VERSION" == "2" ]; then --- End diff -- We set the PySpark version to default to 2 in the configs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r190980689 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -48,7 +48,8 @@ private[spark] class BasicDriverFeatureStep( private val driverMemoryMiB = conf.get(DRIVER_MEMORY) private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) -.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) +.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * driverMemoryMiB).toInt, --- End diff -- Seems a bit strange we set the default to 0.1 here while in `KubernetesConf#createDriverConf` we're setting it to 0.4 if it's missing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r190981394 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh --- @@ -53,6 +53,28 @@ if [ -n "$SPARK_MOUNTED_FILES_DIR" ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." . fi +if [ -n "$PYSPARK_FILES" ]; then +PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES" +fi + +PYSPARK_ARGS="" +if [ -n "$PYSPARK_APP_ARGS" ]; then +PYSPARK_ARGS="$PYSPARK_APP_ARGS" +fi + + +if [ "$PYSPARK_PYTHON_VERSION" == "2" ]; then --- End diff -- Should be fine if we don't set this at all, yeah? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r190981207 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource + +class PythonDriverFeatureStepSuite extends SparkFunSuite { + --- End diff -- Nit: there's an extra newline here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r190981001 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep + +private[spark] class PythonDriverFeatureStep( + kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { +val roleConf = kubernetesConf.roleSpecificConf +require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined") +val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( + s => --- End diff -- Use something more descriptive than `s`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r190980379 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -63,10 +67,16 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( .map(str => str.split(",").toSeq) .getOrElse(Seq.empty[String]) - def sparkFiles(): Seq[String] = sparkConf -.getOption("spark.files") -.map(str => str.split(",").toSeq) -.getOrElse(Seq.empty[String]) + def getRoleConf: T = roleSpecificConf --- End diff -- Don't think we use this anywhere? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r189982571 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala --- @@ -54,7 +54,8 @@ private[spark] class BasicExecutorFeatureStep( private val memoryOverheadMiB = kubernetesConf .get(EXECUTOR_MEMORY_OVERHEAD) -.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, +.getOrElse(math.max( + (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * executorMemoryMiB).toInt, --- End diff -- Yeah so multiple yes, but since have two instances of this magic 0.1 constant I'd rather have it shared somewhere incase we go to update this in the future and don't get everywhere. Could also be a shared constant if we want instead rather than a getter for memory overhead factor, either way keeps us honest. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r189986135 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile --- @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG base_img +FROM $base_img +WORKDIR / +RUN mkdir ${SPARK_HOME}/python +COPY python/lib ${SPARK_HOME}/python/lib +RUN apk add --no-cache python && \ +apk add --no-cache python3 && \ +python -m ensurepip && \ +python3 -m ensurepip && \ +rm -r /usr/lib/python*/ensurepip && \ +pip install --upgrade pip setuptools && \ --- End diff -- ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r189981496 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -154,6 +176,24 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val MEMORY_OVERHEAD_FACTOR = +ConfigBuilder("spark.kubernetes.memoryOverheadFactor") + .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " + +"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs") + .doubleConf + .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1, +"Ensure that memory overhead is a double between 0 --> 1.0") + .createOptional + + val PYSPARK_PYTHON_VERSION = --- End diff -- This is minor, but I have a few questions about this element of the config. First of if this is going to be majour version only lets call it something like majourPythonVersion (e.g. many python2 and python3s exist). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r187811483 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala --- @@ -54,7 +54,8 @@ private[spark] class BasicExecutorFeatureStep( private val memoryOverheadMiB = kubernetesConf .get(EXECUTOR_MEMORY_OVERHEAD) -.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, +.getOrElse(math.max( + (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * executorMemoryMiB).toInt, --- End diff -- Logic is different since it takes in Driver vs. Executor configs to determine memory size. only similar logic is in `kubernetesConf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1)` which seems unnecessary to put into a Utils. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r187647601 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile --- @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG base_img +FROM $base_img +WORKDIR / +RUN mkdir ${SPARK_HOME}/python +COPY python/lib ${SPARK_HOME}/python/lib +RUN apk add --no-cache python && \ +apk add --no-cache python3 && \ +python -m ensurepip && \ +python3 -m ensurepip && \ +rm -r /usr/lib/python*/ensurepip && \ +pip install --upgrade pip setuptools && \ --- End diff -- So you can run both pip and pip3 with the same packages and they'll get installed in different directories and shouldn't stomp on top of eachother. That being said long term venvs are probably the way we want to go, but as we've discussed those are probably non-trivial and should go in a second PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r187645297 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -101,17 +112,29 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +case nonJVM: NonJVMResource => --- End diff -- Maybe add a comment since R isn't currently integrated could be a bit difficult to infer? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r187646183 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala --- @@ -54,7 +54,8 @@ private[spark] class BasicExecutorFeatureStep( private val memoryOverheadMiB = kubernetesConf .get(EXECUTOR_MEMORY_OVERHEAD) -.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, +.getOrElse(math.max( + (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * executorMemoryMiB).toInt, --- End diff -- This logic happens twice, and I'd rather see it in a utils or config class where if we update it in one place it will take effect everywhere. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r187646801 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.ContainerBuilder +import io.fabric8.kubernetes.api.model.EnvVarBuilder +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf +import org.apache.spark.deploy.k8s.KubernetesUtils +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep + +private[spark] class PythonDriverFeatureStep( + kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { +val roleConf = kubernetesConf.roleSpecificConf +require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined") +val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( + s => +new EnvVarBuilder() + .withName(ENV_PYSPARK_ARGS) + .withValue(s.mkString(",")) + .build()) +val maybePythonFiles = kubernetesConf.pyFiles().map( + pyFiles => +new EnvVarBuilder() + .withName(ENV_PYSPARK_FILES) + .withValue(KubernetesUtils.resolveFileUrisAndPath(pyFiles.split(",")) --- End diff -- Maybe add a comment on why we need switch from ","s to ":"s. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r187644556 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -101,17 +112,29 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +case nonJVM: NonJVMResource => --- End diff -- Maybe worth a comment then? Especially since R support isn't integrated right now it's perhaps not super clear to folks why this is being done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186793604 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -44,11 +44,16 @@ private[spark] class BasicDriverFeatureStep( private val driverCpuCores = conf.get("spark.driver.cores", "1") private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES) + private val driverDockerContainer = conf.roleSpecificConf.mainAppResource.map { +case JavaMainAppResource(_) => "driver" +case PythonMainAppResource(_) => "driver-py" + }.getOrElse(throw new SparkException("Must specify a JVM or Python Resource")) --- End diff -- Should I therefore not throw an error here @mccheah and move this logic into the steps? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186591946 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -29,18 +31,36 @@ private[spark] class KubernetesDriverBuilder( new DriverServiceFeatureStep(_), provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountSecretsFeatureStep) = - new MountSecretsFeatureStep(_)) { + new MountSecretsFeatureStep(_), +provideJavaStep: ( + KubernetesConf[KubernetesDriverSpecificConf] +=> JavaDriverFeatureStep) = +new JavaDriverFeatureStep(_), +providePythonStep: ( + KubernetesConf[KubernetesDriverSpecificConf] + => PythonDriverFeatureStep) = + new PythonDriverFeatureStep(_)) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { val baseFeatures = Seq( provideBasicStep(kubernetesConf), provideCredentialsStep(kubernetesConf), provideServiceStep(kubernetesConf)) -val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { - baseFeatures ++ Seq(provideSecretsStep(kubernetesConf)) -} else baseFeatures - +val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { + Some(provideSecretsStep(kubernetesConf)) } else None +val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.getOrElse(None) --- End diff -- In particular if you don't apply any binding step then I think you'll lose the user's application arguments for Java applications. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186591608 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -29,18 +31,36 @@ private[spark] class KubernetesDriverBuilder( new DriverServiceFeatureStep(_), provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountSecretsFeatureStep) = - new MountSecretsFeatureStep(_)) { + new MountSecretsFeatureStep(_), +provideJavaStep: ( + KubernetesConf[KubernetesDriverSpecificConf] +=> JavaDriverFeatureStep) = +new JavaDriverFeatureStep(_), --- End diff -- Indentation is off. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186591836 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -29,18 +31,36 @@ private[spark] class KubernetesDriverBuilder( new DriverServiceFeatureStep(_), provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountSecretsFeatureStep) = - new MountSecretsFeatureStep(_)) { + new MountSecretsFeatureStep(_), +provideJavaStep: ( + KubernetesConf[KubernetesDriverSpecificConf] +=> JavaDriverFeatureStep) = +new JavaDriverFeatureStep(_), +providePythonStep: ( + KubernetesConf[KubernetesDriverSpecificConf] + => PythonDriverFeatureStep) = + new PythonDriverFeatureStep(_)) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { val baseFeatures = Seq( provideBasicStep(kubernetesConf), provideCredentialsStep(kubernetesConf), provideServiceStep(kubernetesConf)) -val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { - baseFeatures ++ Seq(provideSecretsStep(kubernetesConf)) -} else baseFeatures - +val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { + Some(provideSecretsStep(kubernetesConf)) } else None +val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.getOrElse(None) --- End diff -- `getOrElse(None)` is an anti-pattern, I think. We should be able to match directly on the Option itself. Also, if you don't have a main app resource, should the application be treated as a Java application? I would think that's what `SparkLauncher.NO_RESOURCE` would imply. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186591534 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.ContainerBuilder +import io.fabric8.kubernetes.api.model.EnvVar +import io.fabric8.kubernetes.api.model.EnvVarBuilder +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf +import org.apache.spark.deploy.k8s.KubernetesUtils +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep + +private[spark] class PythonDriverFeatureStep( + kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { +val roleConf = kubernetesConf.roleSpecificConf +require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined") +val maybePythonArgs: Option[EnvVar] = Option(roleConf.appArgs).filter(_.nonEmpty).map( --- End diff -- I don't think you should have to declare these types, both for this line and the few others below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186329528 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile --- @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG base_img +FROM $base_img +WORKDIR / +RUN mkdir ${SPARK_HOME}/python +COPY python/lib ${SPARK_HOME}/python/lib +RUN apk add --no-cache python && \ +apk add --no-cache python3 && \ +python -m ensurepip && \ +python3 -m ensurepip && \ +rm -r /usr/lib/python*/ensurepip && \ +pip install --upgrade pip setuptools && \ --- End diff -- Correct. Would love recommendations on dependency management in regards to âpipâ as itâs tricky to allow for both pip installation and pip3 installation. Unless I use two separate virtual environments for dependency management --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186326478 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile --- @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG base_img +FROM $base_img +WORKDIR / +RUN mkdir ${SPARK_HOME}/python +COPY python/lib ${SPARK_HOME}/python/lib +RUN apk add --no-cache python && \ +apk add --no-cache python3 && \ +python -m ensurepip && \ +python3 -m ensurepip && \ +rm -r /usr/lib/python*/ensurepip && \ +pip install --upgrade pip setuptools && \ --- End diff -- this goes to python2 only, I think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186321782 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -63,10 +67,17 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( .map(str => str.split(",").toSeq) .getOrElse(Seq.empty[String]) - def sparkFiles(): Seq[String] = sparkConf -.getOption("spark.files") -.map(str => str.split(",").toSeq) -.getOrElse(Seq.empty[String]) + def pyFiles(): Option[String] = sparkConf +.get(KUBERNETES_PYSPARK_PY_FILES) + + def pySparkMainResource(): Option[String] = sparkConf --- End diff -- I need to parse out the MainAppResource (which I thought we should be doing only once... as such, I thought it would be cleaner to do this... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186298350 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -88,15 +94,22 @@ private[spark] class BasicDriverFeatureStep( .addToRequests("memory", driverMemoryQuantity) .addToLimits("memory", driverMemoryQuantity) .endResources() - .addToArgs("driver") + .addToArgs(driverDockerContainer) .addToArgs("--properties-file", SPARK_CONF_PATH) .addToArgs("--class", conf.roleSpecificConf.mainClass) - // The user application jar is merged into the spark.jars list and managed through that - // property, so there is no need to reference it explicitly here. - .addToArgs(SparkLauncher.NO_RESOURCE) - .addToArgs(conf.roleSpecificConf.appArgs: _*) - .build() +val driverContainer = + if (driverDockerContainer == "driver-py") { --- End diff -- Agreed. I didn't know if we wanted to include a JavaDriverFeatureStep. I will do so then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186266469 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -101,17 +112,29 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +case nonJVM: NonJVMResource => --- End diff -- Because the R step should have the same amount of default MemoryOverhead. As should all NonJVMResources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186254991 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -101,17 +112,29 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +case nonJVM: NonJVMResource => + nonJVM match { +case PythonMainAppResource(res) => + additionalFiles += res + maybePyFiles.foreach{maybePyFiles => +additionalFiles.appendAll(maybePyFiles.split(","))} + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_APP_ARGS, appArgs.mkString(" ")) + } + sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4) --- End diff -- This is set later in BaseDriverStep --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186244105 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -63,10 +67,17 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( .map(str => str.split(",").toSeq) .getOrElse(Seq.empty[String]) - def sparkFiles(): Seq[String] = sparkConf -.getOption("spark.files") -.map(str => str.split(",").toSeq) -.getOrElse(Seq.empty[String]) + def pyFiles(): Option[String] = sparkConf +.get(KUBERNETES_PYSPARK_PY_FILES) + + def pySparkMainResource(): Option[String] = sparkConf --- End diff -- This seems redundant with the driver specific spark conf's MainAppResource. Perhaps remove the need to specify this thing twice? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186244157 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -101,17 +112,29 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +case nonJVM: NonJVMResource => --- End diff -- Why can't we just match `PythonMainAppResource` immediately here - why the two layers of matching? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186244580 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import io.fabric8.kubernetes.api.model.ContainerBuilder +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesUtils +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep + +private[spark] class PythonDriverFeatureStep( + kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { +val mainResource = kubernetesConf.pySparkMainResource() +require(mainResource.isDefined, "PySpark Main Resource must be defined") +val otherPyFiles = kubernetesConf.pyFiles().map(pyFile => + KubernetesUtils.resolveFileUrisAndPath(pyFile.split(",")) +.mkString(":")).getOrElse("") +val withPythonPrimaryFileContainer = new ContainerBuilder(pod.container) + .addNewEnv() +.withName(ENV_PYSPARK_ARGS) +.withValue(kubernetesConf.pySparkAppArgs().getOrElse("")) --- End diff -- Avoid adding empty env vars. The entrypoint should be able to detect the presence of the environment variable being set or not entirely. Then you should only attach these environment variables if they're present. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r18629 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -88,15 +94,22 @@ private[spark] class BasicDriverFeatureStep( .addToRequests("memory", driverMemoryQuantity) .addToLimits("memory", driverMemoryQuantity) .endResources() - .addToArgs("driver") + .addToArgs(driverDockerContainer) .addToArgs("--properties-file", SPARK_CONF_PATH) .addToArgs("--class", conf.roleSpecificConf.mainClass) - // The user application jar is merged into the spark.jars list and managed through that - // property, so there is no need to reference it explicitly here. - .addToArgs(SparkLauncher.NO_RESOURCE) - .addToArgs(conf.roleSpecificConf.appArgs: _*) - .build() +val driverContainer = + if (driverDockerContainer == "driver-py") { --- End diff -- @ifilonenko I think this still needs some work to clean up. What I expect to happen is to have three step types: 1. `BasicDriverFeatureStep`, which is what's here except we don't provide the args to the container in this step anymore. 2. `PythonDriverFeatureStep` which does both what the `PythonDriverFeatureStep` does currently plus adds the `driver-py` argument 3. `JavaDriverFeatureStep` which only adds the argument `SparkLauncher.NO_RESOURCE`, `conf.roleSpecificConf.appArgs`, etc. Then in the `KubernetesDriverBuilder`, always apply the first step, and select which of 2 or 3 to apply based on the app resource type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186244541 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import io.fabric8.kubernetes.api.model.ContainerBuilder +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesUtils +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep + +private[spark] class PythonDriverFeatureStep( + kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { +val mainResource = kubernetesConf.pySparkMainResource() +require(mainResource.isDefined, "PySpark Main Resource must be defined") +val otherPyFiles = kubernetesConf.pyFiles().map(pyFile => + KubernetesUtils.resolveFileUrisAndPath(pyFile.split(",")) +.mkString(":")).getOrElse("") +val withPythonPrimaryFileContainer = new ContainerBuilder(pod.container) + .addNewEnv() +.withName(ENV_PYSPARK_ARGS) +.withValue(kubernetesConf.pySparkAppArgs().getOrElse("")) +.endEnv() + .addNewEnv() +.withName(ENV_PYSPARK_PRIMARY) +.withValue(KubernetesUtils.resolveFileUri(mainResource.get)) +.endEnv() + .addNewEnv() +.withName(ENV_PYSPARK_FILES) +.withValue(if (otherPyFiles == "") {""} else otherPyFiles) --- End diff -- Don't add empty env vars - see above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186240772 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import io.fabric8.kubernetes.api.model.ContainerBuilder +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesUtils +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep + +private[spark] class PythonDriverFeatureStep( + kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { +val mainResource = kubernetesConf.pySparkMainResource() +require(mainResource.isDefined, "PySpark Main Resource must be defined") +val otherPyFiles = kubernetesConf.pyFiles().map(pyFile => + KubernetesUtils.resolveFileUrisAndPath(pyFile.split(",")) +.mkString(":")).getOrElse("") +val withPythonPrimaryFileContainer = new ContainerBuilder(pod.container) + .addNewEnv() +.withName(ENV_PYSPARK_ARGS) +.withValue(kubernetesConf.pySparkAppArgs().getOrElse("")) +.endEnv() + .addNewEnv() +.withName(ENV_PYSPARK_PRIMARY) +.withValue(KubernetesUtils.resolveFileUri(mainResource.get)) +.endEnv() + .addNewEnv() +.withName(ENV_PYSPARK_FILES) +.withValue(if (otherPyFiles == "") {""} else otherPyFiles) --- End diff -- wait, what is this logic? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186240449 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import io.fabric8.kubernetes.api.model.ContainerBuilder +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesUtils +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep + +private[spark] class PythonDriverFeatureStep( + kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { +val mainResource = kubernetesConf.pySparkMainResource() +require(mainResource.isDefined, "PySpark Main Resource must be defined") +val otherPyFiles = kubernetesConf.pyFiles().map(pyFile => + KubernetesUtils.resolveFileUrisAndPath(pyFile.split(",")) +.mkString(":")).getOrElse("") --- End diff -- Leave a comment that we are switching from "," to ":" to match the format expected by the PYTHONPATH environment variable. ( http://xkcd.com/1987 ) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186239751 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -101,17 +112,29 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +case nonJVM: NonJVMResource => + nonJVM match { +case PythonMainAppResource(res) => + additionalFiles += res + maybePyFiles.foreach{maybePyFiles => +additionalFiles.appendAll(maybePyFiles.split(","))} + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_APP_ARGS, appArgs.mkString(" ")) + } + sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4) --- End diff -- Do we want to set this in the JVM case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186240961 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala --- @@ -55,7 +55,8 @@ class KubernetesConfSuite extends SparkFunSuite { APP_ID, None, MAIN_CLASS, - APP_ARGS) + APP_ARGS, + None) --- End diff -- Still want names. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186238816 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -101,17 +112,29 @@ private[spark] object KubernetesConf { appId: String, mainAppResource: Option[MainAppResource], mainClass: String, - appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() +val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { - case JavaMainAppResource(res) => -val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) -if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) -} +case JavaMainAppResource(res) => + val previousJars = sparkConf +.getOption("spark.jars") +.map(_.split(",")) +.getOrElse(Array.empty) + if (!previousJars.contains(res)) { +sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) + } +case nonJVM: NonJVMResource => + nonJVM match { +case PythonMainAppResource(res) => + additionalFiles += res + maybePyFiles.foreach{maybePyFiles => +additionalFiles.appendAll(maybePyFiles.split(","))} --- End diff -- Not for this PR or JIRA, but for later maybe we should normalize our parsing of input files in a way which allows escape characters and share the logic between Yarn/K8s/Mesos/standalone. What do y'all think? Possible follow up JIRA: https://issues.apache.org/jira/browse/SPARK-24184 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186237367 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -695,9 +693,17 @@ private[spark] class SparkSubmit extends Logging { if (isKubernetesCluster) { childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS if (args.primaryResource != SparkLauncher.NO_RESOURCE) { -childArgs ++= Array("--primary-java-resource", args.primaryResource) +if (args.isPython) { --- End diff -- We chatted about this off-line and while its close its not exactly the same so we can deal with minor parts of duplication for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186240207 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -88,15 +94,22 @@ private[spark] class BasicDriverFeatureStep( .addToRequests("memory", driverMemoryQuantity) .addToLimits("memory", driverMemoryQuantity) .endResources() - .addToArgs("driver") + .addToArgs(driverDockerContainer) .addToArgs("--properties-file", SPARK_CONF_PATH) .addToArgs("--class", conf.roleSpecificConf.mainClass) - // The user application jar is merged into the spark.jars list and managed through that - // property, so there is no need to reference it explicitly here. - .addToArgs(SparkLauncher.NO_RESOURCE) - .addToArgs(conf.roleSpecificConf.appArgs: _*) - .build() +val driverContainer = + if (driverDockerContainer == "driver-py") { --- End diff -- Sorry, I was forgot that folks could specify the driver container separately from the worker container nvm. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S] Initial Python Bindings for Py...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r186239895 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -71,7 +77,7 @@ private[spark] class BasicDriverFeatureStep( ("cpu", new QuantityBuilder(false).withAmount(limitCores).build()) } -val driverContainer = new ContainerBuilder(pod.container) +val withoutArgsDriverContainer: ContainerBuilder = new ContainerBuilder(pod.container) --- End diff -- But we _do_ set arguments on this one right? If not please insert a white space so I can see the different visually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org