Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/20761#discussion_r190105917 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.ProcessType.{am, driver, executor, ProcessType} +import org.apache.spark.deploy.yarn.ResourceType.{cores, memory, ResourceType} +import org.apache.spark.deploy.yarn.RunMode.{client, cluster, RunMode} +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.config._ + +private[spark] object ProcessType extends Enumeration { + type ProcessType = Value + val driver, executor, am = Value +} + +private[spark] object RunMode extends Enumeration { + type RunMode = Value + val client, cluster = Value +} + +private[spark] object ResourceType extends Enumeration { + type ResourceType = Value + val cores, memory = Value +} + +private object ResourceTypeValidator { + private val ERROR_PREFIX: String = "Error: " + private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[ResourceConfigProperties]( + new ResourceConfigProperties(am, client, memory), + new ResourceConfigProperties(am, client, cores), + new ResourceConfigProperties(driver, cluster, memory), + new ResourceConfigProperties(driver, cluster, cores), + new ResourceConfigProperties(processType = executor, resourceType = memory), + new ResourceConfigProperties(processType = executor, resourceType = cores)) + + /** + * Validates sparkConf and throws a SparkException if a standard resource (memory or cores) + * is defined with the property spark.yarn.x.resource.y<br> + * + * Example of an invalid config:<br> + * - spark.yarn.driver.resource.memory=2g<br> + * + * Please note that if multiple resources are defined like described above, + * the error messages will be concatenated.<br> + * Example of such a config:<br> + * - spark.yarn.driver.resource.memory=2g<br> + * - spark.yarn.executor.resource.cores=2<br> + * Then the following two error messages will be printed:<br> + * - "memory cannot be requested with config spark.yarn.driver.resource.memory, + * please use config spark.driver.memory instead!<br> + * - "cores cannot be requested with config spark.yarn.executor.resource.cores, + * please use config spark.executor.cores instead!<br> + * + * @param sparkConf + */ + def validateResources(sparkConf: SparkConf): Unit = { + val requestedResources = new RequestedResources(sparkConf) + val sb = new mutable.StringBuilder() + POSSIBLE_RESOURCE_DEFINITIONS.foreach { rcp => + val customResources: Map[String, String] = getCustomResourceValue(requestedResources, rcp) + val (standardResourceConfigKey: String, customResourceConfigKey: String) = + getResourceConfigKeys(rcp) + + val errorMessage = + if (customResources.contains(customResourceConfigKey)) { + s"${rcp.resourceType} cannot be requested with config $customResourceConfigKey, " + + s"please use config $standardResourceConfigKey instead!" + } else { + "" + } + if (errorMessage.nonEmpty) { + printErrorMessageToBuffer(sb, errorMessage) + } + } + + if (sb.nonEmpty) { + throw new SparkException(sb.toString()) + } + } + + /** + * Returns the requested map of custom resources, + * based on the ResourceConfigProperties argument. + * @return + */ + private def getCustomResourceValue( + requestedResources: RequestedResources, + rcp: ResourceConfigProperties) = { + var customResources: Map[String, String] = null + (rcp.processType, rcp.runMode, rcp.resourceType) match { + case (ProcessType.executor, _, _) => customResources = + requestedResources.customExecutorResources + case (ProcessType.am, RunMode.client, _) => customResources = + requestedResources.customAMResources + case (ProcessType.driver, RunMode.cluster, _) => customResources = + requestedResources.customDriverResources + } + customResources + } + + /** + * Returns a tuple of standard resource config key and custom resource config key, based on the + * processType and runMode fields of the ResourceConfigProperties argument. + * Standard resources are memory and cores. + * + * @param rcp + * @return + */ + private def getResourceConfigKeys(rcp: ResourceConfigProperties): (String, String) = { + val standardResourceConfigKey: String = if (rcp.processType == ProcessType.am) { + s"spark.yarn.${rcp.processType}.${rcp.resourceType}" + } else { + s"spark.${rcp.processType}.${rcp.resourceType}" + } + + var customResourceTypeConfigKey: String = "" + (rcp.processType, rcp.runMode) match { + case (ProcessType.am, RunMode.client) => + customResourceTypeConfigKey += YARN_AM_RESOURCE_TYPES_PREFIX + case (ProcessType.driver, RunMode.cluster) => + customResourceTypeConfigKey += YARN_DRIVER_RESOURCE_TYPES_PREFIX + case (ProcessType.executor, _) => + customResourceTypeConfigKey += YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + } + + customResourceTypeConfigKey += rcp.resourceType + + (standardResourceConfigKey, customResourceTypeConfigKey) + } + + private[spark] def printErrorMessageToBuffer(sb: StringBuilder, str: String): Unit = { + sb.append(s"$ERROR_PREFIX$str\n") + } + + private class ResourceConfigProperties( + val processType: ProcessType, + val runMode: RunMode = null, --- End diff -- In my opinion, it's better not to use a default argument here. You construct only 6 instances of this anyway. Also consider making the type of runMode an Option[RunMode], so you can use None instead of null. It is my understanding that null is very unidiomatic in scala. By the way,this is a good candidate for a scala case class, but it doesn't really matter.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org