[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r198653002 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -104,6 +104,20 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_EXECUTOR_LIMIT_GPUS = --- End diff -- Sometimes you need it. For example, to reduce data across multiple executors, you would ideally use ring all-reduce among your executors, but you cannot really do that right now given that executors are scheduled independently. The best you can do right now is to gather all of your data to the driver and then do the reduction there. You can learn more at the SPIP for project hydrogen/barrier execution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...
Github user galv commented on the issue: https://github.com/apache/spark/pull/20761 I agree that there is apparently unnecessary complexity in the validator. I'll try to take a look again at the code today. On Fri, Jun 15, 2018 at 9:15 AM, Marcelo Vanzin wrote: > I don't have issues with the design - I think the main two things I was > concerned about were: > >- not adding another way to set existing Spark options like mem and >cores, which has been addressed >- the seemingly unnecessary complexity in certain parts of the code >like the validator > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/20761#issuecomment-397670091>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AEi_UILXcjEIr37qdC17y73u0W2iz0iUks5t892NgaJpZM4Sg4v9> > . > -- Daniel Galvez http://danielgalvez.me https://github.com/galv --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...
Github user galv commented on the issue: https://github.com/apache/spark/pull/20761 I would like to see this merged, though I got derailed by spark summit last week and other things. I will look this patch over again soon @szyszy If you're busy lately, perhaps I can take over the rest of the code changes suggested by @vanzin, if necessary (I get the impression that this PR is just about ready to merge). @vanzin I appreciate your detailed responses. I'm curious whether you have any overarching serious concerns about this patch, e.g., about its design. I think the interface is fairly appropriate, but I thought I should check whether you think this PR will be ready to merge soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193953345 --- Diff: core/src/main/scala/org/apache/spark/util/RpcUtils.scala --- @@ -44,7 +44,7 @@ private[spark] object RpcUtils { /** Returns the default Spark timeout to use for RPC ask operations. */ def askRpcTimeout(conf: SparkConf): RpcTimeout = { -RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s") +RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "900s") --- End diff -- Why hard-code this change? Couldn't you have set this at runtime if you needed it increased? I'm concerned about it breaking backwards compatibility with jobs that for whatever reason depend on the 120 second timeout. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193953432 --- Diff: core/src/main/scala/org/apache/spark/barrier/BarrierRDD.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.barrier + +import scala.reflect.ClassTag + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD + + +/** + * An RDD that supports running MPI programme. --- End diff -- `programme` -> `program` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r193945410 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala --- @@ -150,6 +152,16 @@ private[spark] class BasicExecutorFeatureStep( .endResources() .build() }.getOrElse(executorContainer) +val containerWithLimitGpus = executorLimitGpus.map { limitGpus => + val executorGpuLimitQuantity = new QuantityBuilder(false) +.withAmount(limitGpus) +.build() + new ContainerBuilder(containerWithLimitCores) +.editResources() + .addToLimits(gpuProvider+"/gpu", executorGpuLimitQuantity) --- End diff -- Style: whitespace around the "+". More importantly, you're assuming that the name of the resource is always going to be "gpu". I'm not sure this is a great idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r193944223 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -104,6 +104,20 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_EXECUTOR_LIMIT_GPUS = +ConfigBuilder("spark.kubernetes.executor.limit.gpus") + .doc("Specify the gpu request for each executor pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_GPU_PROVIDER = +ConfigBuilder("spark.kubernetes.executor.gpu.provider") + .doc("Specify the gpu provider for each executor pod") + .stringConf + .createWithDefault("nvidia.com") + + + --- End diff -- Style remove two lines here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r193945389 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala --- @@ -172,7 +184,7 @@ private[spark] class BasicExecutorFeatureStep( .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) .endSpec() .build() -SparkPod(executorPod, containerWithLimitCores) +SparkPod(executorPod, containerWithLimitGpus) --- End diff -- What if you want to request both GPU's and another resource (e.g., FPGA's) for your executors? You'd have to hard-code chain another optional map like you did in the pull requests. Your current code does not pave a good path forward to requesting other types of resources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r193945237 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -104,6 +104,20 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_EXECUTOR_LIMIT_GPUS = --- End diff -- Why not allow the driver to request GPU's as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193290266 --- Diff: python/pyspark/worker.py --- @@ -232,6 +236,13 @@ def main(infile, outfile): shuffle.DiskBytesSpilled = 0 _accumulatorRegistry.clear() +if (isBarrier): +port = 25333 + 2 + 2 * taskContext._partitionId +paras = GatewayParameters(port=port) --- End diff -- paras -> params --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193269255 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -627,6 +627,52 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(exc.getCause() != null) stream.close() } + + test("support barrier sync under local mode") { +val conf = new SparkConf().setAppName("test").setMaster("local[2]") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2).barrier() +val rdd2 = rdd.mapPartitions { it => + val tc = TaskContext.get.asInstanceOf[org.apache.spark.barrier.BarrierTaskContext] + // If we don't get the expected taskInfos, the job shall abort due to stage failure. + if (tc.hosts().length != 2) { +throw new SparkException("Expected taksInfos length is 2, actual length is " + --- End diff -- `taksInfos` -> `taskInfos` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193289530 --- Diff: python/pyspark/worker.py --- @@ -232,6 +236,13 @@ def main(infile, outfile): shuffle.DiskBytesSpilled = 0 _accumulatorRegistry.clear() +if (isBarrier): --- End diff -- Style: `if (isBarrier):` -> `if isBarrier:` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193291076 --- Diff: python/pyspark/worker.py --- @@ -232,6 +236,13 @@ def main(infile, outfile): shuffle.DiskBytesSpilled = 0 _accumulatorRegistry.clear() +if (isBarrier): +port = 25333 + 2 + 2 * taskContext._partitionId --- End diff -- I recommend using DEFAULT_PORT and DEFAULT_PYTHON_PORT. They are exposed as part of the public API of py4j: https://github.com/bartdag/py4j/blob/216432d859de41441f0d1a0d55b31b5d8d09dd28/py4j-python/src/py4j/java_gateway.py#L54 By the way, acquiring ports like this is a little hacky and may require more thought. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193555968 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -123,6 +124,21 @@ private[spark] class TaskSetManager( // TODO: We should kill any running task attempts when the task set manager becomes a zombie. private[scheduler] var isZombie = false + private[scheduler] lazy val barrierCoordinator = { --- End diff -- I recommend adding a return type here for readability. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193269297 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -627,6 +627,52 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(exc.getCause() != null) stream.close() } + + test("support barrier sync under local mode") { +val conf = new SparkConf().setAppName("test").setMaster("local[2]") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2).barrier() +val rdd2 = rdd.mapPartitions { it => + val tc = TaskContext.get.asInstanceOf[org.apache.spark.barrier.BarrierTaskContext] + // If we don't get the expected taskInfos, the job shall abort due to stage failure. + if (tc.hosts().length != 2) { +throw new SparkException("Expected taksInfos length is 2, actual length is " + + s"${tc.hosts().length}.") + } + // println(tc.getTaskInfos().toList) --- End diff -- Remove comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190107244 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -306,9 +318,14 @@ private[yarn] class YarnAllocator( s"executorsStarting: ${numExecutorsStarting.get}") if (missing > 0) { - logInfo(s"Will request $missing executor container(s), each with " + -s"${resource.getVirtualCores} core(s) and " + -s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)") + var requestContainerMessage = s"Will request $missing executor container(s), each with " + + s"${resource.getVirtualCores} core(s) and " + + s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead) " --- End diff -- Nitpick: I would remove the trailing space here (so it is exactly the same as before for users), and put it at the beginning of s"and with custom resources:" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190106824 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.ProcessType.{am, driver, executor, ProcessType} +import org.apache.spark.deploy.yarn.ResourceType.{cores, memory, ResourceType} +import org.apache.spark.deploy.yarn.RunMode.{client, cluster, RunMode} +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.config._ + +private[spark] object ProcessType extends Enumeration { + type ProcessType = Value + val driver, executor, am = Value +} + +private[spark] object RunMode extends Enumeration { + type RunMode = Value + val client, cluster = Value +} + +private[spark] object ResourceType extends Enumeration { + type ResourceType = Value + val cores, memory = Value +} + +private object ResourceTypeValidator { + private val ERROR_PREFIX: String = "Error: " + private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[ResourceConfigProperties]( +new ResourceConfigProperties(am, client, memory), +new ResourceConfigProperties(am, client, cores), +new ResourceConfigProperties(driver, cluster, memory), +new ResourceConfigProperties(driver, cluster, cores), +new ResourceConfigProperties(processType = executor, resourceType = memory), +new ResourceConfigProperties(processType = executor, resourceType = cores)) + + /** + * Validates sparkConf and throws a SparkException if a standard resource (memory or cores) + * is defined with the property spark.yarn.x.resource.y + * + * Example of an invalid config: + * - spark.yarn.driver.resource.memory=2g + * + * Please note that if multiple resources are defined like described above, + * the error messages will be concatenated. + * Example of such a config: + * - spark.yarn.driver.resource.memory=2g + * - spark.yarn.executor.resource.cores=2 + * Then the following two error messages will be printed: + * - "memory cannot be requested with config spark.yarn.driver.resource.memory, + * please use config spark.driver.memory instead! + * - "cores cannot be requested with config spark.yarn.executor.resource.cores, + * please use config spark.executor.cores instead! + * + * @param sparkConf + */ + def validateResources(sparkConf: SparkConf): Unit = { +val requestedResources = new RequestedResources(sparkConf) +val sb = new mutable.StringBuilder() +POSSIBLE_RESOURCE_DEFINITIONS.foreach { rcp => + val customResources: Map[String, String] = getCustomResourceValue(requestedResources, rcp) + val (standardResourceConfigKey: String, customResourceConfigKey: String) = +getResourceConfigKeys(rcp) + + val errorMessage = +if (customResources.contains(customResourceConfigKey)) { + s"${rcp.resourceType} cannot be requested with config $customResourceConfigKey, " + + s"please use config $standardResourceConfigKey instead!" +} else { + "" +} + if (errorMessage.nonEmpty) { +printErrorMessageToBuffer(sb, errorMessage) + } +} + +if (sb.nonEmpty) { + throw new SparkException(sb.toString()) +} + } + + /** + * Returns the requested map of custom resources, + * based on the ResourceConfigProperties argument. + * @return + */ + private def getCustomReso
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190105256 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.reflect.InvocationTargetException + +import scala.util.control.NonFatal + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * This helper class uses some of Hadoop 3 methods from the yarn API, + * so we need to use reflection to avoid compile error when building against Hadoop 2.x + */ +object ResourceTypeHelper extends Logging { + private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r + private val resourceTypesNotAvailableErrorMessage = +"Ignoring updating resource with resource types because " + +"the version of YARN does not support it!" + + def setResourceInfoFromResourceTypes( + resourceTypesParam: Map[String, String], + resource: Resource): Resource = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return resource +} + +val resourceTypes = resourceTypesParam.map { case (k, v) => ( + if (k.equals("memory")) { +logWarning("Trying to use 'memory' as a custom resource, converted it to 'memory-mb'") +"memory-mb" + } else k, v) +} + +logDebug(s"Custom resource types: $resourceTypes") +resourceTypes.foreach { rt => + val resourceName: String = rt._1 + val (amount, unit) = getAmountAndUnit(rt._2) + logDebug(s"Registering resource with name: $resourceName, amount: $amount, unit: $unit") + + try { +val resInfoClass = Utils.classForName( + "org.apache.hadoop.yarn.api.records.ResourceInformation") +val setResourceInformationMethod = + resource.getClass.getMethod("setResourceInformation", classOf[String], +resInfoClass) + +val resourceInformation = + createResourceInformation(resourceName, amount, unit, resInfoClass) +setResourceInformationMethod.invoke(resource, resourceName, resourceInformation) + } catch { +case e: InvocationTargetException => + if (e.getCause != null) { +throw e.getCause + } else { +throw e + } +case NonFatal(e) => + logWarning(resourceTypesNotAvailableErrorMessage, e) + } +} +resource + } + + def getCustomResourcesAsStrings(resource: Resource): String = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return "" +} + +var res: String = "" +try { + val resUtilsClass = Utils.classForName( +"org.apache.hadoop.yarn.util.resource.ResourceUtils") + val getNumberOfResourceTypesMethod = resUtilsClass.getMethod("getNumberOfKnownResourceTypes") + val numberOfResourceTypes: Int = getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int] + val resourceClass = Utils.classForName( +"org.apache.hadoop.yarn.api.records.Resource") + +
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190105917 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.ProcessType.{am, driver, executor, ProcessType} +import org.apache.spark.deploy.yarn.ResourceType.{cores, memory, ResourceType} +import org.apache.spark.deploy.yarn.RunMode.{client, cluster, RunMode} +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.config._ + +private[spark] object ProcessType extends Enumeration { + type ProcessType = Value + val driver, executor, am = Value +} + +private[spark] object RunMode extends Enumeration { + type RunMode = Value + val client, cluster = Value +} + +private[spark] object ResourceType extends Enumeration { + type ResourceType = Value + val cores, memory = Value +} + +private object ResourceTypeValidator { + private val ERROR_PREFIX: String = "Error: " + private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[ResourceConfigProperties]( +new ResourceConfigProperties(am, client, memory), +new ResourceConfigProperties(am, client, cores), +new ResourceConfigProperties(driver, cluster, memory), +new ResourceConfigProperties(driver, cluster, cores), +new ResourceConfigProperties(processType = executor, resourceType = memory), +new ResourceConfigProperties(processType = executor, resourceType = cores)) + + /** + * Validates sparkConf and throws a SparkException if a standard resource (memory or cores) + * is defined with the property spark.yarn.x.resource.y + * + * Example of an invalid config: + * - spark.yarn.driver.resource.memory=2g + * + * Please note that if multiple resources are defined like described above, + * the error messages will be concatenated. + * Example of such a config: + * - spark.yarn.driver.resource.memory=2g + * - spark.yarn.executor.resource.cores=2 + * Then the following two error messages will be printed: + * - "memory cannot be requested with config spark.yarn.driver.resource.memory, + * please use config spark.driver.memory instead! + * - "cores cannot be requested with config spark.yarn.executor.resource.cores, + * please use config spark.executor.cores instead! + * + * @param sparkConf + */ + def validateResources(sparkConf: SparkConf): Unit = { +val requestedResources = new RequestedResources(sparkConf) +val sb = new mutable.StringBuilder() +POSSIBLE_RESOURCE_DEFINITIONS.foreach { rcp => + val customResources: Map[String, String] = getCustomResourceValue(requestedResources, rcp) + val (standardResourceConfigKey: String, customResourceConfigKey: String) = +getResourceConfigKeys(rcp) + + val errorMessage = +if (customResources.contains(customResourceConfigKey)) { + s"${rcp.resourceType} cannot be requested with config $customResourceConfigKey, " + + s"please use config $standardResourceConfigKey instead!" +} else { + "" +} + if (errorMessage.nonEmpty) { +printErrorMessageToBuffer(sb, errorMessage) + } +} + +if (sb.nonEmpty) { + throw new SparkException(sb.toString()) +} + } + + /** + * Returns the requested map of custom resources, + * based on the ResourceConfigProperties argument. + * @return + */ + private def getCustomResourceValue( + requestedResources: RequestedResources, +
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190106056 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.ProcessType.{am, driver, executor, ProcessType} +import org.apache.spark.deploy.yarn.ResourceType.{cores, memory, ResourceType} +import org.apache.spark.deploy.yarn.RunMode.{client, cluster, RunMode} +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.config._ + +private[spark] object ProcessType extends Enumeration { + type ProcessType = Value + val driver, executor, am = Value +} + +private[spark] object RunMode extends Enumeration { + type RunMode = Value + val client, cluster = Value +} + +private[spark] object ResourceType extends Enumeration { + type ResourceType = Value + val cores, memory = Value +} + +private object ResourceTypeValidator { + private val ERROR_PREFIX: String = "Error: " + private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[ResourceConfigProperties]( +new ResourceConfigProperties(am, client, memory), +new ResourceConfigProperties(am, client, cores), +new ResourceConfigProperties(driver, cluster, memory), +new ResourceConfigProperties(driver, cluster, cores), +new ResourceConfigProperties(processType = executor, resourceType = memory), +new ResourceConfigProperties(processType = executor, resourceType = cores)) + + /** + * Validates sparkConf and throws a SparkException if a standard resource (memory or cores) + * is defined with the property spark.yarn.x.resource.y + * + * Example of an invalid config: + * - spark.yarn.driver.resource.memory=2g + * + * Please note that if multiple resources are defined like described above, + * the error messages will be concatenated. + * Example of such a config: + * - spark.yarn.driver.resource.memory=2g + * - spark.yarn.executor.resource.cores=2 + * Then the following two error messages will be printed: + * - "memory cannot be requested with config spark.yarn.driver.resource.memory, + * please use config spark.driver.memory instead! + * - "cores cannot be requested with config spark.yarn.executor.resource.cores, + * please use config spark.executor.cores instead! + * + * @param sparkConf + */ + def validateResources(sparkConf: SparkConf): Unit = { +val requestedResources = new RequestedResources(sparkConf) +val sb = new mutable.StringBuilder() +POSSIBLE_RESOURCE_DEFINITIONS.foreach { rcp => + val customResources: Map[String, String] = getCustomResourceValue(requestedResources, rcp) + val (standardResourceConfigKey: String, customResourceConfigKey: String) = +getResourceConfigKeys(rcp) + + val errorMessage = +if (customResources.contains(customResourceConfigKey)) { + s"${rcp.resourceType} cannot be requested with config $customResourceConfigKey, " + + s"please use config $standardResourceConfigKey instead!" +} else { + "" +} + if (errorMessage.nonEmpty) { +printErrorMessageToBuffer(sb, errorMessage) + } +} + +if (sb.nonEmpty) { + throw new SparkException(sb.toString()) +} + } + + /** + * Returns the requested map of custom resources, + * based on the ResourceConfigProperties argument. + * @return + */ + private def getCustomResourceValue( + requestedResources: RequestedResources, +
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190106859 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.ProcessType.{am, driver, executor, ProcessType} +import org.apache.spark.deploy.yarn.ResourceType.{cores, memory, ResourceType} +import org.apache.spark.deploy.yarn.RunMode.{client, cluster, RunMode} +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.config._ + +private[spark] object ProcessType extends Enumeration { + type ProcessType = Value + val driver, executor, am = Value +} + +private[spark] object RunMode extends Enumeration { + type RunMode = Value + val client, cluster = Value +} + +private[spark] object ResourceType extends Enumeration { + type ResourceType = Value + val cores, memory = Value +} + +private object ResourceTypeValidator { + private val ERROR_PREFIX: String = "Error: " + private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[ResourceConfigProperties]( +new ResourceConfigProperties(am, client, memory), +new ResourceConfigProperties(am, client, cores), +new ResourceConfigProperties(driver, cluster, memory), +new ResourceConfigProperties(driver, cluster, cores), +new ResourceConfigProperties(processType = executor, resourceType = memory), +new ResourceConfigProperties(processType = executor, resourceType = cores)) + + /** + * Validates sparkConf and throws a SparkException if a standard resource (memory or cores) + * is defined with the property spark.yarn.x.resource.y + * + * Example of an invalid config: + * - spark.yarn.driver.resource.memory=2g + * + * Please note that if multiple resources are defined like described above, + * the error messages will be concatenated. + * Example of such a config: + * - spark.yarn.driver.resource.memory=2g + * - spark.yarn.executor.resource.cores=2 + * Then the following two error messages will be printed: + * - "memory cannot be requested with config spark.yarn.driver.resource.memory, + * please use config spark.driver.memory instead! + * - "cores cannot be requested with config spark.yarn.executor.resource.cores, + * please use config spark.executor.cores instead! + * + * @param sparkConf + */ + def validateResources(sparkConf: SparkConf): Unit = { +val requestedResources = new RequestedResources(sparkConf) +val sb = new mutable.StringBuilder() +POSSIBLE_RESOURCE_DEFINITIONS.foreach { rcp => --- End diff -- I'm having a hard time understanding this method. I will have to come back to it later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190107444 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -306,9 +318,14 @@ private[yarn] class YarnAllocator( s"executorsStarting: ${numExecutorsStarting.get}") if (missing > 0) { - logInfo(s"Will request $missing executor container(s), each with " + -s"${resource.getVirtualCores} core(s) and " + -s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)") + var requestContainerMessage = s"Will request $missing executor container(s), each with " + + s"${resource.getVirtualCores} core(s) and " + + s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead) " + if (ResourceTypeHelper.isYarnResourceTypesAvailable()) { --- End diff -- I would remove this condition. You already have it in `ResourceTypeHelper.getCustomResourcesAsStrings`. Better to write every method such that it won't crash from reflection than to try to force the developer to remember what methods can throw reflection errors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190104166 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.{Integer => JInteger, Long => JLong} +import java.lang.reflect.InvocationTargetException + +import scala.util.control.NonFatal + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * This helper class uses some of Hadoop 3 methods from the YARN API, + * so we need to use reflection to avoid compile error when building against Hadoop 2.x + */ +private object ResourceTypeHelper extends Logging { + private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r + + def setResourceInfoFromResourceTypes( + resourceTypes: Map[String, String], + resource: Resource): Resource = { +require(resource != null, "Resource parameter should not be null!") + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + return resource +} + +logDebug(s"Custom resource types: $resourceTypes") +resourceTypes.foreach { case (name, rawAmount) => + try { +val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount +val (amount, unit) = (amountPart.toLong, unitPart match { + case "m" => "M" --- End diff -- Don't forget to update this when you are ready. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190107502 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -306,9 +318,14 @@ private[yarn] class YarnAllocator( s"executorsStarting: ${numExecutorsStarting.get}") if (missing > 0) { - logInfo(s"Will request $missing executor container(s), each with " + -s"${resource.getVirtualCores} core(s) and " + -s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)") + var requestContainerMessage = s"Will request $missing executor container(s), each with " + + s"${resource.getVirtualCores} core(s) and " + + s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead) " + if (ResourceTypeHelper.isYarnResourceTypesAvailable()) { +requestContainerMessage ++= s"and with custom resources:" + +ResourceTypeHelper.getCustomResourcesAsStrings(resource) --- End diff -- I think you should check for the message being an empty string, so you don't confusingly put just `s"and with custom resources:"` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190104076 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -249,6 +259,10 @@ private[spark] class Client( val capability = Records.newRecord(classOf[Resource]) capability.setMemory(amMemory + amMemoryOverhead) capability.setVirtualCores(amCores) +if (amResources.nonEmpty) { --- End diff -- Nitpick: I would execute `ResourceTypeHelper.setResourceInfoFromResourceTypes(amResources, capability)` unconditionally. That's just my style, though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190103364 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.reflect.InvocationTargetException + +import scala.util.control.NonFatal + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * This helper class uses some of Hadoop 3 methods from the yarn API, + * so we need to use reflection to avoid compile error when building against Hadoop 2.x + */ +object ResourceTypeHelper extends Logging { + private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r + private val resourceTypesNotAvailableErrorMessage = +"Ignoring updating resource with resource types because " + +"the version of YARN does not support it!" + + def setResourceInfoFromResourceTypes( + resourceTypesParam: Map[String, String], + resource: Resource): Resource = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return resource +} + +val resourceTypes = resourceTypesParam.map { case (k, v) => ( + if (k.equals("memory")) { +logWarning("Trying to use 'memory' as a custom resource, converted it to 'memory-mb'") +"memory-mb" + } else k, v) +} + +logDebug(s"Custom resource types: $resourceTypes") +resourceTypes.foreach { rt => + val resourceName: String = rt._1 + val (amount, unit) = getAmountAndUnit(rt._2) + logDebug(s"Registering resource with name: $resourceName, amount: $amount, unit: $unit") + + try { +val resInfoClass = Utils.classForName( + "org.apache.hadoop.yarn.api.records.ResourceInformation") +val setResourceInformationMethod = + resource.getClass.getMethod("setResourceInformation", classOf[String], +resInfoClass) + +val resourceInformation = + createResourceInformation(resourceName, amount, unit, resInfoClass) +setResourceInformationMethod.invoke(resource, resourceName, resourceInformation) + } catch { +case e: InvocationTargetException => + if (e.getCause != null) { +throw e.getCause + } else { +throw e + } +case NonFatal(e) => + logWarning(resourceTypesNotAvailableErrorMessage, e) + } +} +resource + } + + def getCustomResourcesAsStrings(resource: Resource): String = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return "" +} + +var res: String = "" +try { + val resUtilsClass = Utils.classForName( +"org.apache.hadoop.yarn.util.resource.ResourceUtils") + val getNumberOfResourceTypesMethod = resUtilsClass.getMethod("getNumberOfKnownResourceTypes") + val numberOfResourceTypes: Int = getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int] + val resourceClass = Utils.classForName( +"org.apache.hadoop.yarn.api.records.Resource") + + // skip memo
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r189949493 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.reflect.InvocationTargetException + +import scala.util.control.NonFatal + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * This helper class uses some of Hadoop 3 methods from the yarn API, + * so we need to use reflection to avoid compile error when building against Hadoop 2.x + */ +object ResourceTypeHelper extends Logging { + private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r + private val resourceTypesNotAvailableErrorMessage = +"Ignoring updating resource with resource types because " + +"the version of YARN does not support it!" + + def setResourceInfoFromResourceTypes( + resourceTypesParam: Map[String, String], + resource: Resource): Resource = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return resource +} + +val resourceTypes = resourceTypesParam.map { case (k, v) => ( + if (k.equals("memory")) { +logWarning("Trying to use 'memory' as a custom resource, converted it to 'memory-mb'") +"memory-mb" + } else k, v) +} + +logDebug(s"Custom resource types: $resourceTypes") +resourceTypes.foreach { rt => + val resourceName: String = rt._1 + val (amount, unit) = getAmountAndUnit(rt._2) + logDebug(s"Registering resource with name: $resourceName, amount: $amount, unit: $unit") + + try { +val resInfoClass = Utils.classForName( + "org.apache.hadoop.yarn.api.records.ResourceInformation") +val setResourceInformationMethod = + resource.getClass.getMethod("setResourceInformation", classOf[String], +resInfoClass) + +val resourceInformation = + createResourceInformation(resourceName, amount, unit, resInfoClass) +setResourceInformationMethod.invoke(resource, resourceName, resourceInformation) + } catch { +case e: InvocationTargetException => + if (e.getCause != null) { +throw e.getCause + } else { +throw e + } +case NonFatal(e) => + logWarning(resourceTypesNotAvailableErrorMessage, e) + } +} +resource + } + + def getCustomResourcesAsStrings(resource: Resource): String = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return "" +} + +var res: String = "" +try { + val resUtilsClass = Utils.classForName( +"org.apache.hadoop.yarn.util.resource.ResourceUtils") + val getNumberOfResourceTypesMethod = resUtilsClass.getMethod("getNumberOfKnownResourceTypes") + val numberOfResourceTypes: Int = getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int] + val resourceClass = Utils.classForName( +"org.apache.hadoop.yarn.api.records.Resource") + + // skip memo
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r189473608 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -199,6 +204,88 @@ class ClientSuite extends SparkFunSuite with Matchers { appContext.getMaxAppAttempts should be (42) } + test("Resource type args propagate, resource type not defined") { +assume(ResourceTypeHelper.isYarnResourceTypesAvailable()) +val sparkConf = new SparkConf() + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "gpu", "121m") +val args = new ClientArguments(Array()) + +val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) +val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) +val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + +val client = new Client(args, sparkConf) + +try { + client.createApplicationSubmissionContext( +new YarnClientApplication(getNewApplicationResponse, appContext), +containerLaunchContext) +} catch { + case NonFatal(e) => +val expectedExceptionClass = "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException" +if (e.getClass.getName != expectedExceptionClass) { + fail(s"Exception caught: $e is not an instance of $expectedExceptionClass!") +} +} + } + + test("Resource type args propagate (client mode)") { +assume(ResourceTypeHelper.isYarnResourceTypesAvailable()) +TestYarnResourceTypeHelper.initializeResourceTypes(List("gpu", "fpga")) + +val sparkConf = new SparkConf() + .set(YARN_AM_RESOURCE_TYPES_PREFIX + "gpu", "121m") --- End diff -- Nitpick: It makes no sense to request "121m" of a GPU. GPUs can be acquired only as discrete devices. I understand no actual acquisition is happening right now, but these unit tests might mislead newcomers. I would use a different resource name, one that intentionally shows it is nonsensical like "some_resource_with_units1" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r189451540 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.reflect.InvocationTargetException + +import scala.util.control.NonFatal + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * This helper class uses some of Hadoop 3 methods from the yarn API, + * so we need to use reflection to avoid compile error when building against Hadoop 2.x + */ +object ResourceTypeHelper extends Logging { + private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r + private val resourceTypesNotAvailableErrorMessage = +"Ignoring updating resource with resource types because " + +"the version of YARN does not support it!" + + def setResourceInfoFromResourceTypes( + resourceTypesParam: Map[String, String], + resource: Resource): Resource = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return resource +} + +val resourceTypes = resourceTypesParam.map { case (k, v) => ( + if (k.equals("memory")) { +logWarning("Trying to use 'memory' as a custom resource, converted it to 'memory-mb'") +"memory-mb" + } else k, v) +} + +logDebug(s"Custom resource types: $resourceTypes") +resourceTypes.foreach { rt => + val resourceName: String = rt._1 + val (amount, unit) = getAmountAndUnit(rt._2) + logDebug(s"Registering resource with name: $resourceName, amount: $amount, unit: $unit") + + try { +val resInfoClass = Utils.classForName( + "org.apache.hadoop.yarn.api.records.ResourceInformation") +val setResourceInformationMethod = + resource.getClass.getMethod("setResourceInformation", classOf[String], +resInfoClass) + +val resourceInformation = + createResourceInformation(resourceName, amount, unit, resInfoClass) +setResourceInformationMethod.invoke(resource, resourceName, resourceInformation) + } catch { +case e: InvocationTargetException => + if (e.getCause != null) { +throw e.getCause + } else { +throw e + } +case NonFatal(e) => + logWarning(resourceTypesNotAvailableErrorMessage, e) + } +} +resource + } + + def getCustomResourcesAsStrings(resource: Resource): String = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return "" +} + +var res: String = "" +try { + val resUtilsClass = Utils.classForName( +"org.apache.hadoop.yarn.util.resource.ResourceUtils") + val getNumberOfResourceTypesMethod = resUtilsClass.getMethod("getNumberOfKnownResourceTypes") + val numberOfResourceTypes: Int = getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int] + val resourceClass = Utils.classForName( +"org.apache.hadoop.yarn.api.records.Resource") + + // skip memory and vcores
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r189449512 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.reflect.InvocationTargetException + +import scala.util.control.NonFatal + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * This helper class uses some of Hadoop 3 methods from the yarn API, + * so we need to use reflection to avoid compile error when building against Hadoop 2.x + */ +object ResourceTypeHelper extends Logging { + private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r + private val resourceTypesNotAvailableErrorMessage = +"Ignoring updating resource with resource types because " + +"the version of YARN does not support it!" + + def setResourceInfoFromResourceTypes( + resourceTypesParam: Map[String, String], + resource: Resource): Resource = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return resource +} + +val resourceTypes = resourceTypesParam.map { case (k, v) => ( + if (k.equals("memory")) { +logWarning("Trying to use 'memory' as a custom resource, converted it to 'memory-mb'") +"memory-mb" + } else k, v) +} + +logDebug(s"Custom resource types: $resourceTypes") +resourceTypes.foreach { rt => + val resourceName: String = rt._1 + val (amount, unit) = getAmountAndUnit(rt._2) + logDebug(s"Registering resource with name: $resourceName, amount: $amount, unit: $unit") + + try { +val resInfoClass = Utils.classForName( + "org.apache.hadoop.yarn.api.records.ResourceInformation") +val setResourceInformationMethod = + resource.getClass.getMethod("setResourceInformation", classOf[String], +resInfoClass) + +val resourceInformation = + createResourceInformation(resourceName, amount, unit, resInfoClass) +setResourceInformationMethod.invoke(resource, resourceName, resourceInformation) + } catch { +case e: InvocationTargetException => + if (e.getCause != null) { +throw e.getCause + } else { +throw e + } +case NonFatal(e) => + logWarning(resourceTypesNotAvailableErrorMessage, e) + } +} +resource + } + + def getCustomResourcesAsStrings(resource: Resource): String = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return "" +} + +var res: String = "" +try { + val resUtilsClass = Utils.classForName( +"org.apache.hadoop.yarn.util.resource.ResourceUtils") + val getNumberOfResourceTypesMethod = resUtilsClass.getMethod("getNumberOfKnownResourceTypes") + val numberOfResourceTypes: Int = getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int] + val resourceClass = Utils.classForName( +"org.apache.hadoop.yarn.api.records.Resource") + + // skip memo
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r189449967 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.reflect.InvocationTargetException + +import scala.util.control.NonFatal + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * This helper class uses some of Hadoop 3 methods from the yarn API, + * so we need to use reflection to avoid compile error when building against Hadoop 2.x + */ +object ResourceTypeHelper extends Logging { + private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r + private val resourceTypesNotAvailableErrorMessage = +"Ignoring updating resource with resource types because " + +"the version of YARN does not support it!" + + def setResourceInfoFromResourceTypes( + resourceTypesParam: Map[String, String], + resource: Resource): Resource = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return resource +} + +val resourceTypes = resourceTypesParam.map { case (k, v) => ( + if (k.equals("memory")) { +logWarning("Trying to use 'memory' as a custom resource, converted it to 'memory-mb'") +"memory-mb" + } else k, v) +} + +logDebug(s"Custom resource types: $resourceTypes") +resourceTypes.foreach { rt => + val resourceName: String = rt._1 + val (amount, unit) = getAmountAndUnit(rt._2) + logDebug(s"Registering resource with name: $resourceName, amount: $amount, unit: $unit") + + try { +val resInfoClass = Utils.classForName( + "org.apache.hadoop.yarn.api.records.ResourceInformation") +val setResourceInformationMethod = + resource.getClass.getMethod("setResourceInformation", classOf[String], +resInfoClass) + +val resourceInformation = + createResourceInformation(resourceName, amount, unit, resInfoClass) +setResourceInformationMethod.invoke(resource, resourceName, resourceInformation) + } catch { +case e: InvocationTargetException => + if (e.getCause != null) { +throw e.getCause + } else { +throw e + } +case NonFatal(e) => + logWarning(resourceTypesNotAvailableErrorMessage, e) + } +} +resource + } + + def getCustomResourcesAsStrings(resource: Resource): String = { +if (resource == null) { + throw new IllegalArgumentException("Resource parameter should not be null!") +} + +if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) { + logWarning(resourceTypesNotAvailableErrorMessage) + return "" +} + +var res: String = "" +try { + val resUtilsClass = Utils.classForName( +"org.apache.hadoop.yarn.util.resource.ResourceUtils") + val getNumberOfResourceTypesMethod = resUtilsClass.getMethod("getNumberOfKnownResourceTypes") + val numberOfResourceTypes: Int = getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int] + val resourceClass = Utils.classForName( +"org.apache.hadoop.yarn.api.records.Resource") + + // skip memo
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r189450284 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.reflect.InvocationTargetException + +import scala.util.control.NonFatal + +import org.apache.hadoop.yarn.api.records.Resource + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * This helper class uses some of Hadoop 3 methods from the yarn API, + * so we need to use reflection to avoid compile error when building against Hadoop 2.x + */ +object ResourceTypeHelper extends Logging { + private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r + private val resourceTypesNotAvailableErrorMessage = --- End diff -- This is a constant, so why not change it to `RESOURCE_TYPES_NOT_AVAILABLE_ERROR_MESSAGE`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r189449462 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceTypeHelperSuite.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.apache.hadoop.yarn.api.records.Resource +import org.apache.hadoop.yarn.util.Records +import org.scalatest.{BeforeAndAfterAll, Matchers} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.yarn.TestYarnResourceTypeHelper.ResourceInformation + +class ResourceTypeHelperSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { + + private val CUSTOM_RES_1 = "custom-resource-type-1" + private val CUSTOM_RES_2 = "custom-resource-type-2" + + override def beforeAll(): Unit = { +super.beforeAll() + } + + private def getExpectedUnmatchedErrorMessage(value: String) = { +"Value of resource type should match pattern " + + s"([0-9]+)([A-Za-z]*), unmatched value: $value" + } + + test("resource type value does not match pattern") { +assume(ResourceTypeHelper.isYarnResourceTypesAvailable()) +TestYarnResourceTypeHelper.initializeResourceTypes(List()) --- End diff -- You're taking advantage of the implementation detail that the value is matched against the regex before it is checked that the yarn resource type `CUSTOM_RES_1` is available. Therefore, to make this unit test more complete, I would do `TestYarnResourceTypeHelper.initializeResourceTypes(List(CUSTOM_RES_1))`. Same for the test case "resource type just unit defined". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r189449133 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceTypeHelperSuite.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.apache.hadoop.yarn.api.records.Resource +import org.apache.hadoop.yarn.util.Records +import org.scalatest.{BeforeAndAfterAll, Matchers} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.yarn.TestYarnResourceTypeHelper.ResourceInformation + +class ResourceTypeHelperSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { + + private val CUSTOM_RES_1 = "custom-resource-type-1" + private val CUSTOM_RES_2 = "custom-resource-type-2" + + override def beforeAll(): Unit = { +super.beforeAll() + } + + private def getExpectedUnmatchedErrorMessage(value: String) = { +"Value of resource type should match pattern " + + s"([0-9]+)([A-Za-z]*), unmatched value: $value" + } + + test("resource type value does not match pattern") { +assume(ResourceTypeHelper.isYarnResourceTypesAvailable()) +TestYarnResourceTypeHelper.initializeResourceTypes(List()) + +val resourceTypes = Map(CUSTOM_RES_1 -> "**@#") + +val thrown = intercept[IllegalArgumentException] { + ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, createAResource) +} +thrown.getMessage should equal (getExpectedUnmatchedErrorMessage("**@#")) + } + + test("resource type just unit defined") { +assume(ResourceTypeHelper.isYarnResourceTypesAvailable()) +TestYarnResourceTypeHelper.initializeResourceTypes(List()) + +val resourceTypes = Map(CUSTOM_RES_1 -> "m") + +val thrown = intercept[IllegalArgumentException] { + ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, createAResource) +} +thrown.getMessage should equal (getExpectedUnmatchedErrorMessage("m")) + } + + test("resource type with null value should not be allowed") { +assume(ResourceTypeHelper.isYarnResourceTypesAvailable()) +TestYarnResourceTypeHelper.initializeResourceTypes(List()) + +val resourceTypes = Map(CUSTOM_RES_1 -> "123") + +val thrown = intercept[IllegalArgumentException] { + ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, null) +} +thrown.getMessage should equal ("Resource parameter should not be null!") + } + + test("resource type with valid value and invalid unit") { +assume(ResourceTypeHelper.isYarnResourceTypesAvailable()) +TestYarnResourceTypeHelper.initializeResourceTypes(List(CUSTOM_RES_1)) + +val resourceTypes = Map(CUSTOM_RES_1 -> "123ppp") +val resource = createAResource + +val thrown = intercept[IllegalArgumentException] { + ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, resource) +} +thrown.getMessage should fullyMatch regex + """Unknown unit 'ppp'\. Known units are \[.*\]""" + } + + test("resource type with valid value and without unit") { +assume(ResourceTypeHelper.isYarnResourceTypesAvailable()) +TestYarnResourceTypeHelper.initializeResourceTypes(List(CUSTOM_RES_1)) + +val resourceTypes = Map(CUSTOM_RES_1 -> "123") +val resource = createAResource + +ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, resource) +val customResource: ResourceInformation = TestYarnResourceTypeHelper + .getResourceInformationByName(resource, CUSTOM_RES_1)
[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...
Github user galv commented on the issue: https://github.com/apache/spark/pull/20761 I may be missing something, but doesn't this change depend on Spark's yarn client being made hadoop 3.0 compatible? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r189426179 --- Diff: docs/running-on-yarn.md --- @@ -121,6 +121,28 @@ To use a custom metrics.properties for the application master and executors, upd Use lower-case suffixes, e.g. k, m, g, t, and p, for kibi-, mebi-, gibi-, tebi-, and pebibytes, respectively. + + spark.yarn.am.resource.resource-type --- End diff -- Style nitpick: I think `{resource-type}` or `${resource-type}` matches existing documentaiton better than ``. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r189426163 --- Diff: docs/running-on-yarn.md --- @@ -121,6 +121,28 @@ To use a custom metrics.properties for the application master and executors, upd Use lower-case suffixes, e.g. k, m, g, t, and p, for kibi-, mebi-, gibi-, tebi-, and pebibytes, respectively. + + spark.yarn.am.resource.resource-type + (none) + +Amount of resource to use for the YARN Application Master in client mode. +In cluster mode, use spark.yarn.driver.resource.resource-type instead --- End diff -- It would be great to show an example of what the syntax looks like. I believe the Nvidia GPU resource type would be requested like "spark.yarn.am.resource.yarn.io/gpu", if I remember correctly. Also, can you add something like "This feature can be used only with Yarn 3.0+", and that some resource types require higher versions? (iirc, only Yarn 3.1 knows how to isolate GPU's with cgroups, but I'm not sure whether this prevents you from requesting GPUs at all in Yarn 3.0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org