allisonwang-db commented on a change in pull request #33554:
URL: https://github.com/apache/spark/pull/33554#discussion_r691663252



##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {

Review comment:
       acquireUnavailableAddressError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {

Review comment:
       acquireNonExistingAddressError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {

Review comment:
       releaseUnassignedAddressError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(

Review comment:
       failToParseJsonToResourceInformationError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(

Review comment:
       discoveryScriptNameMismatchError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {

Review comment:
       executorResourceLessThanTaskResourceRequestError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {

Review comment:
       noDiscoveryScriptSpecifiedError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {
+    new SparkException(s"The executor resource: $rName, amount: ${num} " +
+      s"needs to be >= the task resource request amount of $taskReq")
+  }
+
+  def noExecutorResourceConfigError(str: String): Throwable = {
+    new SparkException("No executor resource configs were not specified for 
the " +
+      s"following task configs: ${str}")
+  }
+
+  def resourceProfileSupportedError(): Throwable = {

Review comment:
       resourceProfilesUnsupportedError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {
+    new SparkException(s"The executor resource: $rName, amount: ${num} " +
+      s"needs to be >= the task resource request amount of $taskReq")
+  }
+
+  def noExecutorResourceConfigError(str: String): Throwable = {
+    new SparkException("No executor resource configs were not specified for 
the " +
+      s"following task configs: ${str}")
+  }
+
+  def resourceProfileSupportedError(): Throwable = {
+    new SparkException("ResourceProfiles are only supported on YARN and 
Kubernetes " +
+      "with dynamic allocation enabled.")
+  }
+
+  def resourceProfileIdNotFoundError(rpId: Int): Throwable = {
+    new SparkException(s"ResourceProfileId $rpId not found!")
+  }
+
+  def specifyConfigOfResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def specifyAnAmountConfigForResourceError(
+      key: String,
+      componentName: String,
+      RESOURCE_PREFIX: String): Throwable = {
+    new SparkException(s"You must specify an amount config for resource: $key 
" +
+      s"config: $componentName.$RESOURCE_PREFIX.$key")
+  }
+
+  def conditionOfResourceAmountError(doubleAmount: Double): Throwable = {
+    new SparkException(
+      s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole 
number.")
+  }
+
+  def specifyAmountForResourceError(str: String): Throwable = {

Review comment:
       Can be combined with
   ```
   def specifyConfigOfResourceError(str: String): Throwable = {
       new SparkException(s"You must specify an amount for ${str}")
     }
   ```

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {
+    new SparkException(s"The executor resource: $rName, amount: ${num} " +
+      s"needs to be >= the task resource request amount of $taskReq")
+  }
+
+  def noExecutorResourceConfigError(str: String): Throwable = {
+    new SparkException("No executor resource configs were not specified for 
the " +
+      s"following task configs: ${str}")
+  }
+
+  def resourceProfileSupportedError(): Throwable = {
+    new SparkException("ResourceProfiles are only supported on YARN and 
Kubernetes " +
+      "with dynamic allocation enabled.")
+  }
+
+  def resourceProfileIdNotFoundError(rpId: Int): Throwable = {
+    new SparkException(s"ResourceProfileId $rpId not found!")
+  }
+
+  def specifyConfigOfResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def specifyAnAmountConfigForResourceError(

Review comment:
       noAmountConfigSpecifiedForResourceError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {
+    new SparkException(s"The executor resource: $rName, amount: ${num} " +
+      s"needs to be >= the task resource request amount of $taskReq")
+  }
+
+  def noExecutorResourceConfigError(str: String): Throwable = {
+    new SparkException("No executor resource configs were not specified for 
the " +
+      s"following task configs: ${str}")
+  }
+
+  def resourceProfileSupportedError(): Throwable = {
+    new SparkException("ResourceProfiles are only supported on YARN and 
Kubernetes " +
+      "with dynamic allocation enabled.")
+  }
+
+  def resourceProfileIdNotFoundError(rpId: Int): Throwable = {
+    new SparkException(s"ResourceProfileId $rpId not found!")
+  }
+
+  def specifyConfigOfResourceError(str: String): Throwable = {

Review comment:
       noAmountSpecifiedForResourceError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {
+    new SparkException(s"The executor resource: $rName, amount: ${num} " +
+      s"needs to be >= the task resource request amount of $taskReq")
+  }
+
+  def noExecutorResourceConfigError(str: String): Throwable = {
+    new SparkException("No executor resource configs were not specified for 
the " +
+      s"following task configs: ${str}")
+  }
+
+  def resourceProfileSupportedError(): Throwable = {
+    new SparkException("ResourceProfiles are only supported on YARN and 
Kubernetes " +
+      "with dynamic allocation enabled.")
+  }
+
+  def resourceProfileIdNotFoundError(rpId: Int): Throwable = {
+    new SparkException(s"ResourceProfileId $rpId not found!")
+  }
+
+  def specifyConfigOfResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def specifyAnAmountConfigForResourceError(
+      key: String,
+      componentName: String,
+      RESOURCE_PREFIX: String): Throwable = {
+    new SparkException(s"You must specify an amount config for resource: $key 
" +
+      s"config: $componentName.$RESOURCE_PREFIX.$key")
+  }
+
+  def conditionOfResourceAmountError(doubleAmount: Double): Throwable = {
+    new SparkException(
+      s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole 
number.")
+  }
+
+  def specifyAmountForResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def tasksSupportFractionalResourceError(componentName: String): Throwable = {
+    new SparkException(
+      s"Only tasks support fractional resources, please check your 
$componentName settings")
+  }
+
+  def ParsingResourceFileError(resourcesFile: String, e: Throwable): Throwable 
= {
+    new SparkException(s"Error parsing resources file $resourcesFile", e)
+  }
+
+  def conditionOfTheNumberOfCoresPerExecutorError(execCores: Int, taskCpus: 
Int): Throwable = {

Review comment:
       numCoresPerExecutorLessThanNumCPUsPerTaskError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {

Review comment:
       releaseNonExistingAddressError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {
+    new SparkException(s"The executor resource: $rName, amount: ${num} " +
+      s"needs to be >= the task resource request amount of $taskReq")
+  }
+
+  def noExecutorResourceConfigError(str: String): Throwable = {
+    new SparkException("No executor resource configs were not specified for 
the " +
+      s"following task configs: ${str}")
+  }
+
+  def resourceProfileSupportedError(): Throwable = {
+    new SparkException("ResourceProfiles are only supported on YARN and 
Kubernetes " +
+      "with dynamic allocation enabled.")
+  }
+
+  def resourceProfileIdNotFoundError(rpId: Int): Throwable = {
+    new SparkException(s"ResourceProfileId $rpId not found!")
+  }
+
+  def specifyConfigOfResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def specifyAnAmountConfigForResourceError(
+      key: String,
+      componentName: String,
+      RESOURCE_PREFIX: String): Throwable = {
+    new SparkException(s"You must specify an amount config for resource: $key 
" +
+      s"config: $componentName.$RESOURCE_PREFIX.$key")
+  }
+
+  def conditionOfResourceAmountError(doubleAmount: Double): Throwable = {
+    new SparkException(
+      s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole 
number.")
+  }
+
+  def specifyAmountForResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def tasksSupportFractionalResourceError(componentName: String): Throwable = {

Review comment:
       fractionalResourcesUnsupportedError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {

Review comment:
       failToParseJsonToResourceInformationError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {
+    new SparkException(s"The executor resource: $rName, amount: ${num} " +
+      s"needs to be >= the task resource request amount of $taskReq")
+  }
+
+  def noExecutorResourceConfigError(str: String): Throwable = {
+    new SparkException("No executor resource configs were not specified for 
the " +
+      s"following task configs: ${str}")
+  }
+
+  def resourceProfileSupportedError(): Throwable = {
+    new SparkException("ResourceProfiles are only supported on YARN and 
Kubernetes " +
+      "with dynamic allocation enabled.")
+  }
+
+  def resourceProfileIdNotFoundError(rpId: Int): Throwable = {
+    new SparkException(s"ResourceProfileId $rpId not found!")
+  }
+
+  def specifyConfigOfResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def specifyAnAmountConfigForResourceError(
+      key: String,
+      componentName: String,
+      RESOURCE_PREFIX: String): Throwable = {
+    new SparkException(s"You must specify an amount config for resource: $key 
" +
+      s"config: $componentName.$RESOURCE_PREFIX.$key")
+  }
+
+  def conditionOfResourceAmountError(doubleAmount: Double): Throwable = {
+    new SparkException(
+      s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole 
number.")
+  }
+
+  def specifyAmountForResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def tasksSupportFractionalResourceError(componentName: String): Throwable = {
+    new SparkException(
+      s"Only tasks support fractional resources, please check your 
$componentName settings")
+  }
+
+  def ParsingResourceFileError(resourcesFile: String, e: Throwable): Throwable 
= {
+    new SparkException(s"Error parsing resources file $resourcesFile", e)
+  }
+
+  def conditionOfTheNumberOfCoresPerExecutorError(execCores: Int, taskCpus: 
Int): Throwable = {
+    new SparkException(s"The number of cores per executor (=$execCores) has to 
be >= " +
+      s"the number of cpus per task = $taskCpus.")
+  }
+
+  def nonOfTheDiscoveryPluginsReturnedResourceInfoError(str: String): 
Throwable = {
+    new SparkException(s"None of the discovery plugins returned 
ResourceInformation for " +
+      s"${str}")
+  }
+
+  def adjustConfigurationError(

Review comment:
       adjustConfigurationofCoresError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {
+    new SparkException(s"The executor resource: $rName, amount: ${num} " +
+      s"needs to be >= the task resource request amount of $taskReq")
+  }
+
+  def noExecutorResourceConfigError(str: String): Throwable = {
+    new SparkException("No executor resource configs were not specified for 
the " +
+      s"following task configs: ${str}")
+  }
+
+  def resourceProfileSupportedError(): Throwable = {
+    new SparkException("ResourceProfiles are only supported on YARN and 
Kubernetes " +
+      "with dynamic allocation enabled.")
+  }
+
+  def resourceProfileIdNotFoundError(rpId: Int): Throwable = {
+    new SparkException(s"ResourceProfileId $rpId not found!")
+  }
+
+  def specifyConfigOfResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def specifyAnAmountConfigForResourceError(
+      key: String,
+      componentName: String,
+      RESOURCE_PREFIX: String): Throwable = {
+    new SparkException(s"You must specify an amount config for resource: $key 
" +
+      s"config: $componentName.$RESOURCE_PREFIX.$key")
+  }
+
+  def conditionOfResourceAmountError(doubleAmount: Double): Throwable = {

Review comment:
       invalidResourceAmountError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {
+    new SparkException(s"The executor resource: $rName, amount: ${num} " +
+      s"needs to be >= the task resource request amount of $taskReq")
+  }
+
+  def noExecutorResourceConfigError(str: String): Throwable = {
+    new SparkException("No executor resource configs were not specified for 
the " +
+      s"following task configs: ${str}")
+  }
+
+  def resourceProfileSupportedError(): Throwable = {
+    new SparkException("ResourceProfiles are only supported on YARN and 
Kubernetes " +
+      "with dynamic allocation enabled.")
+  }
+
+  def resourceProfileIdNotFoundError(rpId: Int): Throwable = {
+    new SparkException(s"ResourceProfileId $rpId not found!")
+  }
+
+  def specifyConfigOfResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def specifyAnAmountConfigForResourceError(
+      key: String,
+      componentName: String,
+      RESOURCE_PREFIX: String): Throwable = {
+    new SparkException(s"You must specify an amount config for resource: $key 
" +
+      s"config: $componentName.$RESOURCE_PREFIX.$key")
+  }
+
+  def conditionOfResourceAmountError(doubleAmount: Double): Throwable = {
+    new SparkException(
+      s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole 
number.")
+  }
+
+  def specifyAmountForResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def tasksSupportFractionalResourceError(componentName: String): Throwable = {
+    new SparkException(
+      s"Only tasks support fractional resources, please check your 
$componentName settings")
+  }
+
+  def ParsingResourceFileError(resourcesFile: String, e: Throwable): Throwable 
= {

Review comment:
       failToParseResourceFileError

##########
File path: core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
##########
@@ -141,4 +144,151 @@ object SparkCoreErrors {
   def mustSpecifyCheckpointDirError(): Throwable = {
     new SparkException("Checkpoint dir must be specified.")
   }
+
+  def acquireAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to acquire an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def acquireAnAddressNotAvailableError(resourceName: String, address: 
String): Throwable = {
+    new SparkException("Try to acquire an address that is not available. " +
+      s"$resourceName address $address is not available.")
+  }
+
+  def releaseAnAddressNotExistError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that doesn't exist. 
$resourceName " +
+      s"address $address doesn't exist.")
+  }
+
+  def releaseAnAddressNotAssignedError(resourceName: String, address: String): 
Throwable = {
+    new SparkException(s"Try to release an address that is not assigned. 
$resourceName " +
+      s"address $address is not assigned.")
+  }
+
+  def resourceScriptNotExistError(scriptFile: File, resourceName: String): 
Throwable = {
+    new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+      "doesn't exist!")
+  }
+
+  def expectUseResourceButNotSpecifyADiscoveryScriptError(resourceName: 
String): Throwable = {
+    new SparkException(s"User is expecting to use resource: $resourceName, but 
" +
+      "didn't specify a discovery script!")
+  }
+
+  def runningOtherResourceError(
+      script: Optional[String],
+      name: String,
+      resourceName: String): Throwable = {
+    new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+      s"script returned resource name ${name} and we were expecting 
$resourceName.")
+  }
+
+  def ParsingJsonToResourceInformationError(
+      json: String,
+      exampleJson: String,
+      e: Throwable): Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+      s"Here is a correct example: $exampleJson.", e)
+  }
+
+  def ParsingJsonToResourceInformationError(json: JValue, e: Throwable): 
Throwable = {
+    new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
+  }
+
+  def resourceNotExistInProfileIdError(resource: String, id: Int): Throwable = 
{
+    new SparkException(s"Resource $resource doesn't exist in profile id: $id")
+  }
+
+  def conditionOfExecutorResourceError(rName: String, num: Long, taskReq: 
Double): Throwable = {
+    new SparkException(s"The executor resource: $rName, amount: ${num} " +
+      s"needs to be >= the task resource request amount of $taskReq")
+  }
+
+  def noExecutorResourceConfigError(str: String): Throwable = {
+    new SparkException("No executor resource configs were not specified for 
the " +
+      s"following task configs: ${str}")
+  }
+
+  def resourceProfileSupportedError(): Throwable = {
+    new SparkException("ResourceProfiles are only supported on YARN and 
Kubernetes " +
+      "with dynamic allocation enabled.")
+  }
+
+  def resourceProfileIdNotFoundError(rpId: Int): Throwable = {
+    new SparkException(s"ResourceProfileId $rpId not found!")
+  }
+
+  def specifyConfigOfResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def specifyAnAmountConfigForResourceError(
+      key: String,
+      componentName: String,
+      RESOURCE_PREFIX: String): Throwable = {
+    new SparkException(s"You must specify an amount config for resource: $key 
" +
+      s"config: $componentName.$RESOURCE_PREFIX.$key")
+  }
+
+  def conditionOfResourceAmountError(doubleAmount: Double): Throwable = {
+    new SparkException(
+      s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole 
number.")
+  }
+
+  def specifyAmountForResourceError(str: String): Throwable = {
+    new SparkException(s"You must specify an amount for ${str}")
+  }
+
+  def tasksSupportFractionalResourceError(componentName: String): Throwable = {
+    new SparkException(
+      s"Only tasks support fractional resources, please check your 
$componentName settings")
+  }
+
+  def ParsingResourceFileError(resourcesFile: String, e: Throwable): Throwable 
= {
+    new SparkException(s"Error parsing resources file $resourcesFile", e)
+  }
+
+  def conditionOfTheNumberOfCoresPerExecutorError(execCores: Int, taskCpus: 
Int): Throwable = {
+    new SparkException(s"The number of cores per executor (=$execCores) has to 
be >= " +
+      s"the number of cpus per task = $taskCpus.")
+  }
+
+  def nonOfTheDiscoveryPluginsReturnedResourceInfoError(str: String): 
Throwable = {
+    new SparkException(s"None of the discovery plugins returned 
ResourceInformation for " +
+      s"${str}")
+  }
+
+  def adjustConfigurationError(
+      cores: Int,
+      taskCpus: Int,
+      resourceNumSlots: Int,
+      limitingResource: String,
+      maxTaskPerExec: Int): Throwable = {
+    new SparkException(
+      s"""
+        |The configuration of cores (exec = ${cores}
+        |task = ${taskCpus}, runnable tasks = ${resourceNumSlots}) will
+        |result in wasted resources due to resource ${limitingResource} 
limiting the
+        |number of runnable tasks per executor to: ${maxTaskPerExec}. Please 
adjust
+        |your configuration.
+      """.stripMargin.replaceAll("\n", " "))
+  }
+
+  def adjustConfigurationError(

Review comment:
       adjustConfigurationOfResourceError




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to