[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...

2018-06-27 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21511#discussion_r198653002
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -104,6 +104,20 @@ private[spark] object Config extends Logging {
   .stringConf
   .createOptional
 
+  val KUBERNETES_EXECUTOR_LIMIT_GPUS =
--- End diff --

Sometimes you need it. For example, to reduce data across multiple 
executors, you would ideally use ring all-reduce among your executors, but you 
cannot really do that right now given that executors are scheduled 
independently. The best you can do right now is to gather all of your data to 
the driver and then do the reduction there. You can learn more at the SPIP for 
project hydrogen/barrier execution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...

2018-06-15 Thread galv
Github user galv commented on the issue:

https://github.com/apache/spark/pull/20761
  
I agree that there is apparently unnecessary complexity in the validator.

I'll try to take a look again at the code today.

On Fri, Jun 15, 2018 at 9:15 AM, Marcelo Vanzin 
wrote:

> I don't have issues with the design - I think the main two things I was
> concerned about were:
>
>- not adding another way to set existing Spark options like mem and
>cores, which has been addressed
>- the seemingly unnecessary complexity in certain parts of the code
>like the validator
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/20761#issuecomment-397670091>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AEi_UILXcjEIr37qdC17y73u0W2iz0iUks5t892NgaJpZM4Sg4v9>
> .
>



-- 
Daniel Galvez
http://danielgalvez.me
https://github.com/galv



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...

2018-06-14 Thread galv
Github user galv commented on the issue:

https://github.com/apache/spark/pull/20761
  
I would like to see this merged, though I got derailed by spark summit last 
week and other things. I will look this patch over again soon @szyszy If you're 
busy lately, perhaps I can take over the rest of the code changes suggested by 
@vanzin, if necessary (I get the impression that this PR is just about ready to 
merge).

@vanzin I appreciate your detailed responses. I'm curious whether you have 
any overarching serious concerns about this patch, e.g., about its design. I 
think the interface is fairly appropriate, but I thought I should check whether 
you think this PR will be ready to merge soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-10 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193953345
  
--- Diff: core/src/main/scala/org/apache/spark/util/RpcUtils.scala ---
@@ -44,7 +44,7 @@ private[spark] object RpcUtils {
 
   /** Returns the default Spark timeout to use for RPC ask operations. */
   def askRpcTimeout(conf: SparkConf): RpcTimeout = {
-RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), 
"120s")
+RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), 
"900s")
--- End diff --

Why hard-code this change? Couldn't you have set this at runtime if you 
needed it increased? I'm concerned about it breaking backwards compatibility 
with jobs that for whatever reason depend on the 120 second timeout.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-10 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193953432
  
--- Diff: core/src/main/scala/org/apache/spark/barrier/BarrierRDD.scala ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.barrier
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * An RDD that supports running MPI programme.
--- End diff --

`programme` -> `program`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...

2018-06-07 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21511#discussion_r193945410
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -150,6 +152,16 @@ private[spark] class BasicExecutorFeatureStep(
   .endResources()
 .build()
 }.getOrElse(executorContainer)
+val containerWithLimitGpus = executorLimitGpus.map { limitGpus =>
+  val executorGpuLimitQuantity = new QuantityBuilder(false)
+.withAmount(limitGpus)
+.build()
+  new ContainerBuilder(containerWithLimitCores)
+.editResources()
+  .addToLimits(gpuProvider+"/gpu", executorGpuLimitQuantity)
--- End diff --

Style: whitespace around the "+".

More importantly, you're assuming that the name of the resource is always 
going to be "gpu". I'm not sure this is a great idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...

2018-06-07 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21511#discussion_r193944223
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -104,6 +104,20 @@ private[spark] object Config extends Logging {
   .stringConf
   .createOptional
 
+  val KUBERNETES_EXECUTOR_LIMIT_GPUS =
+ConfigBuilder("spark.kubernetes.executor.limit.gpus")
+  .doc("Specify the gpu request for each executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_GPU_PROVIDER =
+ConfigBuilder("spark.kubernetes.executor.gpu.provider")
+  .doc("Specify the gpu provider for each executor pod")
+  .stringConf
+  .createWithDefault("nvidia.com")
+
+
+
--- End diff --

Style remove two lines here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...

2018-06-07 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21511#discussion_r193945389
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -172,7 +184,7 @@ private[spark] class BasicExecutorFeatureStep(
 .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
 .endSpec()
   .build()
-SparkPod(executorPod, containerWithLimitCores)
+SparkPod(executorPod, containerWithLimitGpus)
--- End diff --

What if you want to request both GPU's and another resource (e.g., FPGA's) 
for your executors? You'd have to hard-code chain another optional map like you 
did in the pull requests. Your current code does not pave a good path forward 
to requesting other types of resources.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...

2018-06-07 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21511#discussion_r193945237
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -104,6 +104,20 @@ private[spark] object Config extends Logging {
   .stringConf
   .createOptional
 
+  val KUBERNETES_EXECUTOR_LIMIT_GPUS =
--- End diff --

Why not allow the driver to request GPU's as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193290266
  
--- Diff: python/pyspark/worker.py ---
@@ -232,6 +236,13 @@ def main(infile, outfile):
 shuffle.DiskBytesSpilled = 0
 _accumulatorRegistry.clear()
 
+if (isBarrier):
+port = 25333 + 2 + 2 * taskContext._partitionId
+paras = GatewayParameters(port=port)
--- End diff --

paras -> params


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193269255
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -627,6 +627,52 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 assert(exc.getCause() != null)
 stream.close()
   }
+
+  test("support barrier sync under local mode") {
+val conf = new SparkConf().setAppName("test").setMaster("local[2]")
+sc = new SparkContext(conf)
+val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2).barrier()
+val rdd2 = rdd.mapPartitions { it =>
+  val tc = 
TaskContext.get.asInstanceOf[org.apache.spark.barrier.BarrierTaskContext]
+  // If we don't get the expected taskInfos, the job shall abort due 
to stage failure.
+  if (tc.hosts().length != 2) {
+throw new SparkException("Expected taksInfos length is 2, actual 
length is " +
--- End diff --

`taksInfos` -> `taskInfos`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193289530
  
--- Diff: python/pyspark/worker.py ---
@@ -232,6 +236,13 @@ def main(infile, outfile):
 shuffle.DiskBytesSpilled = 0
 _accumulatorRegistry.clear()
 
+if (isBarrier):
--- End diff --

Style: `if (isBarrier):` -> `if isBarrier:`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193291076
  
--- Diff: python/pyspark/worker.py ---
@@ -232,6 +236,13 @@ def main(infile, outfile):
 shuffle.DiskBytesSpilled = 0
 _accumulatorRegistry.clear()
 
+if (isBarrier):
+port = 25333 + 2 + 2 * taskContext._partitionId
--- End diff --

I recommend using DEFAULT_PORT and DEFAULT_PYTHON_PORT. They are exposed as 
part of the public API of py4j:  
https://github.com/bartdag/py4j/blob/216432d859de41441f0d1a0d55b31b5d8d09dd28/py4j-python/src/py4j/java_gateway.py#L54

By the way, acquiring ports like this is a little hacky and may require 
more thought.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193555968
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -123,6 +124,21 @@ private[spark] class TaskSetManager(
   // TODO: We should kill any running task attempts when the task set 
manager becomes a zombie.
   private[scheduler] var isZombie = false
 
+  private[scheduler] lazy val barrierCoordinator = {
--- End diff --

I recommend adding a return type here for readability.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193269297
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -627,6 +627,52 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 assert(exc.getCause() != null)
 stream.close()
   }
+
+  test("support barrier sync under local mode") {
+val conf = new SparkConf().setAppName("test").setMaster("local[2]")
+sc = new SparkContext(conf)
+val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2).barrier()
+val rdd2 = rdd.mapPartitions { it =>
+  val tc = 
TaskContext.get.asInstanceOf[org.apache.spark.barrier.BarrierTaskContext]
+  // If we don't get the expected taskInfos, the job shall abort due 
to stage failure.
+  if (tc.hosts().length != 2) {
+throw new SparkException("Expected taksInfos length is 2, actual 
length is " +
+  s"${tc.hosts().length}.")
+  }
+  // println(tc.getTaskInfos().toList)
--- End diff --

Remove comment


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190107244
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -306,9 +318,14 @@ private[yarn] class YarnAllocator(
   s"executorsStarting: ${numExecutorsStarting.get}")
 
 if (missing > 0) {
-  logInfo(s"Will request $missing executor container(s), each with " +
-s"${resource.getVirtualCores} core(s) and " +
-s"${resource.getMemory} MB memory (including $memoryOverhead MB of 
overhead)")
+  var requestContainerMessage = s"Will request $missing executor 
container(s), each with " +
+  s"${resource.getVirtualCores} core(s) and " +
+  s"${resource.getMemory} MB memory (including $memoryOverhead MB 
of overhead) "
--- End diff --

Nitpick: I would remove the trailing space here (so it is exactly the same 
as before for users), and put it at the beginning of s"and with custom 
resources:"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190106824
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.ProcessType.{am, driver, executor, 
ProcessType}
+import org.apache.spark.deploy.yarn.ResourceType.{cores, memory, 
ResourceType}
+import org.apache.spark.deploy.yarn.RunMode.{client, cluster, RunMode}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
+
+private[spark] object ProcessType extends Enumeration {
+  type ProcessType = Value
+  val driver, executor, am = Value
+}
+
+private[spark] object RunMode extends Enumeration {
+  type RunMode = Value
+  val client, cluster = Value
+}
+
+private[spark] object ResourceType extends Enumeration {
+  type ResourceType = Value
+  val cores, memory = Value
+}
+
+private object ResourceTypeValidator {
+  private val ERROR_PREFIX: String = "Error: "
+  private val POSSIBLE_RESOURCE_DEFINITIONS = 
Seq[ResourceConfigProperties](
+new ResourceConfigProperties(am, client, memory),
+new ResourceConfigProperties(am, client, cores),
+new ResourceConfigProperties(driver, cluster, memory),
+new ResourceConfigProperties(driver, cluster, cores),
+new ResourceConfigProperties(processType = executor, resourceType = 
memory),
+new ResourceConfigProperties(processType = executor, resourceType = 
cores))
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val requestedResources = new RequestedResources(sparkConf)
+val sb = new mutable.StringBuilder()
+POSSIBLE_RESOURCE_DEFINITIONS.foreach { rcp =>
+  val customResources: Map[String, String] = 
getCustomResourceValue(requestedResources, rcp)
+  val (standardResourceConfigKey: String, customResourceConfigKey: 
String) =
+getResourceConfigKeys(rcp)
+
+  val errorMessage =
+if (customResources.contains(customResourceConfigKey)) {
+  s"${rcp.resourceType} cannot be requested with config 
$customResourceConfigKey, " +
+  s"please use config $standardResourceConfigKey instead!"
+} else {
+  ""
+}
+  if (errorMessage.nonEmpty) {
+printErrorMessageToBuffer(sb, errorMessage)
+  }
+}
+
+if (sb.nonEmpty) {
+  throw new SparkException(sb.toString())
+}
+  }
+
+  /**
+   * Returns the requested map of custom resources,
+   * based on the ResourceConfigProperties argument.
+   * @return
+   */
+  private def getCustomReso

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190105256
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the yarn API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val resourceTypesNotAvailableErrorMessage =
+"Ignoring updating resource with resource types because " +
+"the version of YARN does not support it!"
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypesParam: Map[String, String],
+  resource: Resource): Resource = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return resource
+}
+
+val resourceTypes = resourceTypesParam.map { case (k, v) => (
+  if (k.equals("memory")) {
+logWarning("Trying to use 'memory' as a custom resource, converted 
it to 'memory-mb'")
+"memory-mb"
+  } else k, v)
+}
+
+logDebug(s"Custom resource types: $resourceTypes")
+resourceTypes.foreach { rt =>
+  val resourceName: String = rt._1
+  val (amount, unit) = getAmountAndUnit(rt._2)
+  logDebug(s"Registering resource with name: $resourceName, amount: 
$amount, unit: $unit")
+
+  try {
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+
+val resourceInformation =
+  createResourceInformation(resourceName, amount, unit, 
resInfoClass)
+setResourceInformationMethod.invoke(resource, resourceName, 
resourceInformation)
+  } catch {
+case e: InvocationTargetException =>
+  if (e.getCause != null) {
+throw e.getCause
+  } else {
+throw e
+  }
+case NonFatal(e) =>
+  logWarning(resourceTypesNotAvailableErrorMessage, e)
+  }
+}
+resource
+  }
+
+  def getCustomResourcesAsStrings(resource: Resource): String = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return ""
+}
+
+var res: String = ""
+try {
+  val resUtilsClass = Utils.classForName(
+"org.apache.hadoop.yarn.util.resource.ResourceUtils")
+  val getNumberOfResourceTypesMethod = 
resUtilsClass.getMethod("getNumberOfKnownResourceTypes")
+  val numberOfResourceTypes: Int = 
getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int]
+  val resourceClass = Utils.classForName(
+"org.apache.hadoop.yarn.api.records.Resource")
+
+  

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190105917
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.ProcessType.{am, driver, executor, 
ProcessType}
+import org.apache.spark.deploy.yarn.ResourceType.{cores, memory, 
ResourceType}
+import org.apache.spark.deploy.yarn.RunMode.{client, cluster, RunMode}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
+
+private[spark] object ProcessType extends Enumeration {
+  type ProcessType = Value
+  val driver, executor, am = Value
+}
+
+private[spark] object RunMode extends Enumeration {
+  type RunMode = Value
+  val client, cluster = Value
+}
+
+private[spark] object ResourceType extends Enumeration {
+  type ResourceType = Value
+  val cores, memory = Value
+}
+
+private object ResourceTypeValidator {
+  private val ERROR_PREFIX: String = "Error: "
+  private val POSSIBLE_RESOURCE_DEFINITIONS = 
Seq[ResourceConfigProperties](
+new ResourceConfigProperties(am, client, memory),
+new ResourceConfigProperties(am, client, cores),
+new ResourceConfigProperties(driver, cluster, memory),
+new ResourceConfigProperties(driver, cluster, cores),
+new ResourceConfigProperties(processType = executor, resourceType = 
memory),
+new ResourceConfigProperties(processType = executor, resourceType = 
cores))
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val requestedResources = new RequestedResources(sparkConf)
+val sb = new mutable.StringBuilder()
+POSSIBLE_RESOURCE_DEFINITIONS.foreach { rcp =>
+  val customResources: Map[String, String] = 
getCustomResourceValue(requestedResources, rcp)
+  val (standardResourceConfigKey: String, customResourceConfigKey: 
String) =
+getResourceConfigKeys(rcp)
+
+  val errorMessage =
+if (customResources.contains(customResourceConfigKey)) {
+  s"${rcp.resourceType} cannot be requested with config 
$customResourceConfigKey, " +
+  s"please use config $standardResourceConfigKey instead!"
+} else {
+  ""
+}
+  if (errorMessage.nonEmpty) {
+printErrorMessageToBuffer(sb, errorMessage)
+  }
+}
+
+if (sb.nonEmpty) {
+  throw new SparkException(sb.toString())
+}
+  }
+
+  /**
+   * Returns the requested map of custom resources,
+   * based on the ResourceConfigProperties argument.
+   * @return
+   */
+  private def getCustomResourceValue(
+  requestedResources: RequestedResources,
+ 

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190106056
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.ProcessType.{am, driver, executor, 
ProcessType}
+import org.apache.spark.deploy.yarn.ResourceType.{cores, memory, 
ResourceType}
+import org.apache.spark.deploy.yarn.RunMode.{client, cluster, RunMode}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
+
+private[spark] object ProcessType extends Enumeration {
+  type ProcessType = Value
+  val driver, executor, am = Value
+}
+
+private[spark] object RunMode extends Enumeration {
+  type RunMode = Value
+  val client, cluster = Value
+}
+
+private[spark] object ResourceType extends Enumeration {
+  type ResourceType = Value
+  val cores, memory = Value
+}
+
+private object ResourceTypeValidator {
+  private val ERROR_PREFIX: String = "Error: "
+  private val POSSIBLE_RESOURCE_DEFINITIONS = 
Seq[ResourceConfigProperties](
+new ResourceConfigProperties(am, client, memory),
+new ResourceConfigProperties(am, client, cores),
+new ResourceConfigProperties(driver, cluster, memory),
+new ResourceConfigProperties(driver, cluster, cores),
+new ResourceConfigProperties(processType = executor, resourceType = 
memory),
+new ResourceConfigProperties(processType = executor, resourceType = 
cores))
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val requestedResources = new RequestedResources(sparkConf)
+val sb = new mutable.StringBuilder()
+POSSIBLE_RESOURCE_DEFINITIONS.foreach { rcp =>
+  val customResources: Map[String, String] = 
getCustomResourceValue(requestedResources, rcp)
+  val (standardResourceConfigKey: String, customResourceConfigKey: 
String) =
+getResourceConfigKeys(rcp)
+
+  val errorMessage =
+if (customResources.contains(customResourceConfigKey)) {
+  s"${rcp.resourceType} cannot be requested with config 
$customResourceConfigKey, " +
+  s"please use config $standardResourceConfigKey instead!"
+} else {
+  ""
+}
+  if (errorMessage.nonEmpty) {
+printErrorMessageToBuffer(sb, errorMessage)
+  }
+}
+
+if (sb.nonEmpty) {
+  throw new SparkException(sb.toString())
+}
+  }
+
+  /**
+   * Returns the requested map of custom resources,
+   * based on the ResourceConfigProperties argument.
+   * @return
+   */
+  private def getCustomResourceValue(
+  requestedResources: RequestedResources,
+ 

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190106859
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.ProcessType.{am, driver, executor, 
ProcessType}
+import org.apache.spark.deploy.yarn.ResourceType.{cores, memory, 
ResourceType}
+import org.apache.spark.deploy.yarn.RunMode.{client, cluster, RunMode}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
+
+private[spark] object ProcessType extends Enumeration {
+  type ProcessType = Value
+  val driver, executor, am = Value
+}
+
+private[spark] object RunMode extends Enumeration {
+  type RunMode = Value
+  val client, cluster = Value
+}
+
+private[spark] object ResourceType extends Enumeration {
+  type ResourceType = Value
+  val cores, memory = Value
+}
+
+private object ResourceTypeValidator {
+  private val ERROR_PREFIX: String = "Error: "
+  private val POSSIBLE_RESOURCE_DEFINITIONS = 
Seq[ResourceConfigProperties](
+new ResourceConfigProperties(am, client, memory),
+new ResourceConfigProperties(am, client, cores),
+new ResourceConfigProperties(driver, cluster, memory),
+new ResourceConfigProperties(driver, cluster, cores),
+new ResourceConfigProperties(processType = executor, resourceType = 
memory),
+new ResourceConfigProperties(processType = executor, resourceType = 
cores))
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val requestedResources = new RequestedResources(sparkConf)
+val sb = new mutable.StringBuilder()
+POSSIBLE_RESOURCE_DEFINITIONS.foreach { rcp =>
--- End diff --

I'm having a hard time understanding this method. I will have to come back 
to it later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190107444
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -306,9 +318,14 @@ private[yarn] class YarnAllocator(
   s"executorsStarting: ${numExecutorsStarting.get}")
 
 if (missing > 0) {
-  logInfo(s"Will request $missing executor container(s), each with " +
-s"${resource.getVirtualCores} core(s) and " +
-s"${resource.getMemory} MB memory (including $memoryOverhead MB of 
overhead)")
+  var requestContainerMessage = s"Will request $missing executor 
container(s), each with " +
+  s"${resource.getVirtualCores} core(s) and " +
+  s"${resource.getMemory} MB memory (including $memoryOverhead MB 
of overhead) "
+  if (ResourceTypeHelper.isYarnResourceTypesAvailable()) {
--- End diff --

I would remove this condition. You already have it in 
`ResourceTypeHelper.getCustomResourcesAsStrings`. Better to write every method 
such that it won't crash from reflection than to try to force the developer to 
remember what methods can throw reflection errors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190104166
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypes: Map[String, String],
+  resource: Resource): Resource = {
+require(resource != null, "Resource parameter should not be null!")
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  return resource
+}
+
+logDebug(s"Custom resource types: $resourceTypes")
+resourceTypes.foreach { case (name, rawAmount) =>
+  try {
+val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
+val (amount, unit) = (amountPart.toLong, unitPart match {
+  case "m" => "M"
--- End diff --

Don't forget to update this when you are ready.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190107502
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -306,9 +318,14 @@ private[yarn] class YarnAllocator(
   s"executorsStarting: ${numExecutorsStarting.get}")
 
 if (missing > 0) {
-  logInfo(s"Will request $missing executor container(s), each with " +
-s"${resource.getVirtualCores} core(s) and " +
-s"${resource.getMemory} MB memory (including $memoryOverhead MB of 
overhead)")
+  var requestContainerMessage = s"Will request $missing executor 
container(s), each with " +
+  s"${resource.getVirtualCores} core(s) and " +
+  s"${resource.getMemory} MB memory (including $memoryOverhead MB 
of overhead) "
+  if (ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+requestContainerMessage ++= s"and with custom resources:" +
+ResourceTypeHelper.getCustomResourcesAsStrings(resource)
--- End diff --

I think you should check for the message being an empty string, so you 
don't confusingly put just `s"and with custom resources:"`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190104076
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -249,6 +259,10 @@ private[spark] class Client(
 val capability = Records.newRecord(classOf[Resource])
 capability.setMemory(amMemory + amMemoryOverhead)
 capability.setVirtualCores(amCores)
+if (amResources.nonEmpty) {
--- End diff --

Nitpick: I would execute 
`ResourceTypeHelper.setResourceInfoFromResourceTypes(amResources, capability)` 
unconditionally. That's just my style, though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r190103364
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the yarn API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val resourceTypesNotAvailableErrorMessage =
+"Ignoring updating resource with resource types because " +
+"the version of YARN does not support it!"
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypesParam: Map[String, String],
+  resource: Resource): Resource = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return resource
+}
+
+val resourceTypes = resourceTypesParam.map { case (k, v) => (
+  if (k.equals("memory")) {
+logWarning("Trying to use 'memory' as a custom resource, converted 
it to 'memory-mb'")
+"memory-mb"
+  } else k, v)
+}
+
+logDebug(s"Custom resource types: $resourceTypes")
+resourceTypes.foreach { rt =>
+  val resourceName: String = rt._1
+  val (amount, unit) = getAmountAndUnit(rt._2)
+  logDebug(s"Registering resource with name: $resourceName, amount: 
$amount, unit: $unit")
+
+  try {
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+
+val resourceInformation =
+  createResourceInformation(resourceName, amount, unit, 
resInfoClass)
+setResourceInformationMethod.invoke(resource, resourceName, 
resourceInformation)
+  } catch {
+case e: InvocationTargetException =>
+  if (e.getCause != null) {
+throw e.getCause
+  } else {
+throw e
+  }
+case NonFatal(e) =>
+  logWarning(resourceTypesNotAvailableErrorMessage, e)
+  }
+}
+resource
+  }
+
+  def getCustomResourcesAsStrings(resource: Resource): String = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return ""
+}
+
+var res: String = ""
+try {
+  val resUtilsClass = Utils.classForName(
+"org.apache.hadoop.yarn.util.resource.ResourceUtils")
+  val getNumberOfResourceTypesMethod = 
resUtilsClass.getMethod("getNumberOfKnownResourceTypes")
+  val numberOfResourceTypes: Int = 
getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int]
+  val resourceClass = Utils.classForName(
+"org.apache.hadoop.yarn.api.records.Resource")
+
+  // skip memo

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-22 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r189949493
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the yarn API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val resourceTypesNotAvailableErrorMessage =
+"Ignoring updating resource with resource types because " +
+"the version of YARN does not support it!"
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypesParam: Map[String, String],
+  resource: Resource): Resource = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return resource
+}
+
+val resourceTypes = resourceTypesParam.map { case (k, v) => (
+  if (k.equals("memory")) {
+logWarning("Trying to use 'memory' as a custom resource, converted 
it to 'memory-mb'")
+"memory-mb"
+  } else k, v)
+}
+
+logDebug(s"Custom resource types: $resourceTypes")
+resourceTypes.foreach { rt =>
+  val resourceName: String = rt._1
+  val (amount, unit) = getAmountAndUnit(rt._2)
+  logDebug(s"Registering resource with name: $resourceName, amount: 
$amount, unit: $unit")
+
+  try {
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+
+val resourceInformation =
+  createResourceInformation(resourceName, amount, unit, 
resInfoClass)
+setResourceInformationMethod.invoke(resource, resourceName, 
resourceInformation)
+  } catch {
+case e: InvocationTargetException =>
+  if (e.getCause != null) {
+throw e.getCause
+  } else {
+throw e
+  }
+case NonFatal(e) =>
+  logWarning(resourceTypesNotAvailableErrorMessage, e)
+  }
+}
+resource
+  }
+
+  def getCustomResourcesAsStrings(resource: Resource): String = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return ""
+}
+
+var res: String = ""
+try {
+  val resUtilsClass = Utils.classForName(
+"org.apache.hadoop.yarn.util.resource.ResourceUtils")
+  val getNumberOfResourceTypesMethod = 
resUtilsClass.getMethod("getNumberOfKnownResourceTypes")
+  val numberOfResourceTypes: Int = 
getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int]
+  val resourceClass = Utils.classForName(
+"org.apache.hadoop.yarn.api.records.Resource")
+
+  // skip memo

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-20 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r189473608
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +204,88 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "gpu", "121m")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+
+try {
+  client.createApplicationSubmissionContext(
+new YarnClientApplication(getNewApplicationResponse, appContext),
+containerLaunchContext)
+} catch {
+  case NonFatal(e) =>
+val expectedExceptionClass = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
+if (e.getClass.getName != expectedExceptionClass) {
+  fail(s"Exception caught: $e is not an instance of 
$expectedExceptionClass!")
+}
+}
+  }
+
+  test("Resource type args propagate (client mode)") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List("gpu", "fpga"))
+
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "gpu", "121m")
--- End diff --

Nitpick: It makes no sense to request "121m" of a GPU. GPUs can be acquired 
only as discrete devices. I understand no actual acquisition is happening right 
now, but these unit tests might mislead newcomers. I would use a different 
resource name, one that intentionally shows it is nonsensical like 
"some_resource_with_units1"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-20 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r189451540
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the yarn API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val resourceTypesNotAvailableErrorMessage =
+"Ignoring updating resource with resource types because " +
+"the version of YARN does not support it!"
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypesParam: Map[String, String],
+  resource: Resource): Resource = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return resource
+}
+
+val resourceTypes = resourceTypesParam.map { case (k, v) => (
+  if (k.equals("memory")) {
+logWarning("Trying to use 'memory' as a custom resource, converted 
it to 'memory-mb'")
+"memory-mb"
+  } else k, v)
+}
+
+logDebug(s"Custom resource types: $resourceTypes")
+resourceTypes.foreach { rt =>
+  val resourceName: String = rt._1
+  val (amount, unit) = getAmountAndUnit(rt._2)
+  logDebug(s"Registering resource with name: $resourceName, amount: 
$amount, unit: $unit")
+
+  try {
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+
+val resourceInformation =
+  createResourceInformation(resourceName, amount, unit, 
resInfoClass)
+setResourceInformationMethod.invoke(resource, resourceName, 
resourceInformation)
+  } catch {
+case e: InvocationTargetException =>
+  if (e.getCause != null) {
+throw e.getCause
+  } else {
+throw e
+  }
+case NonFatal(e) =>
+  logWarning(resourceTypesNotAvailableErrorMessage, e)
+  }
+}
+resource
+  }
+
+  def getCustomResourcesAsStrings(resource: Resource): String = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return ""
+}
+
+var res: String = ""
+try {
+  val resUtilsClass = Utils.classForName(
+"org.apache.hadoop.yarn.util.resource.ResourceUtils")
+  val getNumberOfResourceTypesMethod = 
resUtilsClass.getMethod("getNumberOfKnownResourceTypes")
+  val numberOfResourceTypes: Int = 
getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int]
+  val resourceClass = Utils.classForName(
+"org.apache.hadoop.yarn.api.records.Resource")
+
+  // skip memory and vcores 

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-20 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r189449512
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the yarn API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val resourceTypesNotAvailableErrorMessage =
+"Ignoring updating resource with resource types because " +
+"the version of YARN does not support it!"
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypesParam: Map[String, String],
+  resource: Resource): Resource = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return resource
+}
+
+val resourceTypes = resourceTypesParam.map { case (k, v) => (
+  if (k.equals("memory")) {
+logWarning("Trying to use 'memory' as a custom resource, converted 
it to 'memory-mb'")
+"memory-mb"
+  } else k, v)
+}
+
+logDebug(s"Custom resource types: $resourceTypes")
+resourceTypes.foreach { rt =>
+  val resourceName: String = rt._1
+  val (amount, unit) = getAmountAndUnit(rt._2)
+  logDebug(s"Registering resource with name: $resourceName, amount: 
$amount, unit: $unit")
+
+  try {
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+
+val resourceInformation =
+  createResourceInformation(resourceName, amount, unit, 
resInfoClass)
+setResourceInformationMethod.invoke(resource, resourceName, 
resourceInformation)
+  } catch {
+case e: InvocationTargetException =>
+  if (e.getCause != null) {
+throw e.getCause
+  } else {
+throw e
+  }
+case NonFatal(e) =>
+  logWarning(resourceTypesNotAvailableErrorMessage, e)
+  }
+}
+resource
+  }
+
+  def getCustomResourcesAsStrings(resource: Resource): String = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return ""
+}
+
+var res: String = ""
+try {
+  val resUtilsClass = Utils.classForName(
+"org.apache.hadoop.yarn.util.resource.ResourceUtils")
+  val getNumberOfResourceTypesMethod = 
resUtilsClass.getMethod("getNumberOfKnownResourceTypes")
+  val numberOfResourceTypes: Int = 
getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int]
+  val resourceClass = Utils.classForName(
+"org.apache.hadoop.yarn.api.records.Resource")
+
+  // skip memo

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-20 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r189449967
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the yarn API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val resourceTypesNotAvailableErrorMessage =
+"Ignoring updating resource with resource types because " +
+"the version of YARN does not support it!"
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypesParam: Map[String, String],
+  resource: Resource): Resource = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return resource
+}
+
+val resourceTypes = resourceTypesParam.map { case (k, v) => (
+  if (k.equals("memory")) {
+logWarning("Trying to use 'memory' as a custom resource, converted 
it to 'memory-mb'")
+"memory-mb"
+  } else k, v)
+}
+
+logDebug(s"Custom resource types: $resourceTypes")
+resourceTypes.foreach { rt =>
+  val resourceName: String = rt._1
+  val (amount, unit) = getAmountAndUnit(rt._2)
+  logDebug(s"Registering resource with name: $resourceName, amount: 
$amount, unit: $unit")
+
+  try {
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+
+val resourceInformation =
+  createResourceInformation(resourceName, amount, unit, 
resInfoClass)
+setResourceInformationMethod.invoke(resource, resourceName, 
resourceInformation)
+  } catch {
+case e: InvocationTargetException =>
+  if (e.getCause != null) {
+throw e.getCause
+  } else {
+throw e
+  }
+case NonFatal(e) =>
+  logWarning(resourceTypesNotAvailableErrorMessage, e)
+  }
+}
+resource
+  }
+
+  def getCustomResourcesAsStrings(resource: Resource): String = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return ""
+}
+
+var res: String = ""
+try {
+  val resUtilsClass = Utils.classForName(
+"org.apache.hadoop.yarn.util.resource.ResourceUtils")
+  val getNumberOfResourceTypesMethod = 
resUtilsClass.getMethod("getNumberOfKnownResourceTypes")
+  val numberOfResourceTypes: Int = 
getNumberOfResourceTypesMethod.invoke(null).asInstanceOf[Int]
+  val resourceClass = Utils.classForName(
+"org.apache.hadoop.yarn.api.records.Resource")
+
+  // skip memo

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-20 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r189450284
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the yarn API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val resourceTypesNotAvailableErrorMessage =
--- End diff --

This is a constant, so why not change it to 
`RESOURCE_TYPES_NOT_AVAILABLE_ERROR_MESSAGE`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-20 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r189449462
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceTypeHelperSuite.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.SparkFunSuite
+import 
org.apache.spark.deploy.yarn.TestYarnResourceTypeHelper.ResourceInformation
+
+class ResourceTypeHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedUnmatchedErrorMessage(value: String) = {
+"Value of resource type should match pattern " +
+  s"([0-9]+)([A-Za-z]*), unmatched value: $value"
+  }
+
+  test("resource type value does not match pattern") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List())
--- End diff --

You're taking advantage of the implementation detail that the value is 
matched against the regex before it is checked that the yarn resource type 
`CUSTOM_RES_1` is available. Therefore, to make this unit test more complete, I 
would do 
`TestYarnResourceTypeHelper.initializeResourceTypes(List(CUSTOM_RES_1))`. Same 
for the test case "resource type just unit defined".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-19 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r189449133
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceTypeHelperSuite.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.SparkFunSuite
+import 
org.apache.spark.deploy.yarn.TestYarnResourceTypeHelper.ResourceInformation
+
+class ResourceTypeHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedUnmatchedErrorMessage(value: String) = {
+"Value of resource type should match pattern " +
+  s"([0-9]+)([A-Za-z]*), unmatched value: $value"
+  }
+
+  test("resource type value does not match pattern") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "**@#")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage("**@#"))
+  }
+
+  test("resource type just unit defined") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "m")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal (getExpectedUnmatchedErrorMessage("m"))
+  }
+
+  test("resource type with null value should not be allowed") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "123")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, 
null)
+}
+thrown.getMessage should equal ("Resource parameter should not be 
null!")
+  }
+
+  test("resource type with valid value and invalid unit") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "123ppp")
+val resource = createAResource
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, 
resource)
+}
+thrown.getMessage should fullyMatch regex
+  """Unknown unit 'ppp'\. Known units are \[.*\]"""
+  }
+
+  test("resource type with valid value and without unit") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "123")
+val resource = createAResource
+
+ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, 
resource)
+val customResource: ResourceInformation = TestYarnResourceTypeHelper
+  .getResourceInformationByName(resource, CUSTOM_RES_1)

[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...

2018-05-18 Thread galv
Github user galv commented on the issue:

https://github.com/apache/spark/pull/20761
  
I may be missing something, but doesn't this change depend on Spark's yarn 
client being made hadoop 3.0 compatible?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-18 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r189426179
  
--- Diff: docs/running-on-yarn.md ---
@@ -121,6 +121,28 @@ To use a custom metrics.properties for the application 
master and executors, upd
 Use lower-case suffixes, e.g. k, m, 
g, t, and p, for kibi-, mebi-, gibi-, 
tebi-, and pebibytes, respectively.
   
 
+
+  spark.yarn.am.resource.resource-type
--- End diff --

Style nitpick: I think `{resource-type}` or `${resource-type}` matches 
existing documentaiton better than ``.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-05-18 Thread galv
Github user galv commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r189426163
  
--- Diff: docs/running-on-yarn.md ---
@@ -121,6 +121,28 @@ To use a custom metrics.properties for the application 
master and executors, upd
 Use lower-case suffixes, e.g. k, m, 
g, t, and p, for kibi-, mebi-, gibi-, 
tebi-, and pebibytes, respectively.
   
 
+
+  spark.yarn.am.resource.resource-type
+  (none)
+  
+Amount of resource to use for the YARN Application Master in client 
mode.
+In cluster mode, use 
spark.yarn.driver.resource.resource-type instead
--- End diff --

It would be great to show an example of what the syntax looks like. I 
believe the Nvidia GPU resource type would be requested like 
"spark.yarn.am.resource.yarn.io/gpu", if I remember correctly.

Also, can you add something like "This feature can be used only with Yarn 
3.0+", and that some resource types require higher versions? (iirc, only Yarn 
3.1 knows how to isolate GPU's with cgroups, but I'm not sure whether this 
prevents you from requesting GPUs at all in Yarn 3.0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org