[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-12 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224923768
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+
+  /**
+   * Validates sparkConf and throws a SparkException if any of standard 
resources (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
--- End diff --

I think the code is now complete, please check!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-12 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224914397
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+
+  /**
+   * Validates sparkConf and throws a SparkException if any of standard 
resources (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
--- End diff --

Sure, adding some explanatory comments with my next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-12 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224913985
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+
+  /**
+   * Validates sparkConf and throws a SparkException if any of standard 
resources (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
--- End diff --

Please see my last commit with the updates.
I only added some tests, so they are not extensive for every combination of 
spark resources and YARN standard resources. If you think I can add more 
testcases but I think this is fine as it is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-12 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224909836
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+
+  /**
+   * Validates sparkConf and throws a SparkException if any of standard 
resources (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
--- End diff --

These are two separate things: 
1. One is that I don't reject all the deprecated standard resources has 
been known to YARN (explained in previous comment) which I will address soon.
2. Using `memory-mb` is the only way to initialize the memory resource with 
the YARN client, with the method `ResourceUtils.reinitializeResources`. 
I played around with this a bit, if I omit the standard resources and try 
to specify custom resources and then call 
`ResourceUtils.reinitializeResources`, an internal YARN exception will be 
thrown. 
Unfortunately, invoking this method is the most simple way to build tests 
upon custom resource types, to my best knowledge, so I can't really do much 
about this. 

> and also the inconsistency in your code (using both memory and memory-mb).
What did you mean with this? The only use of `"memory"` all around the 
change is to prevent it from being used with the new resource configs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-12 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224850514
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+
+  /**
+   * Validates sparkConf and throws a SparkException if any of standard 
resources (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
--- End diff --

Sure!
Did you mean this documentation? 

https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
I think it's required to check all the keys for memory / vcore that YARN 
deprecates, as those will flow trough Spark and eventually reach YARN's 
`ResourceInformation` and it will just blow up as only `memory-mb` and `vcores` 
are the ones that are not deprecated. The reason why it haven't caused a 
problem with current Spark code as it is using the `Resource` object and not 
using `ResourceInformation` at all.
So we need to disallow these:
- cpu-vcores
- memory
- mb

What do you think?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-11 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224595724
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  test("resource request value does not match pattern") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1),
+  Map(CUSTOM_RES_1 -> "**@#"), CUSTOM_RES_1)
+  }
+
+  test("resource request just unit defined") {
+verifySetResourceRequestsException(List(), Map(CUSTOM_RES_1 -> "m"), 
CUSTOM_RES_1)
+  }
+
+  test("resource request with null value should not be allowed") {
+verifySetResourceRequestsException(List(), null, Map(CUSTOM_RES_1 -> 
"123"),
+  "requirement failed: Resource parameter should not be null!")
+  }
+
+  test("resource request with valid value and invalid unit") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1), createResource,
+  Map(CUSTOM_RES_1 -> "123ppp"), "")
+  }
+
+  test("resource request with valid value and without unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "123"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "")))
+  }
+
+  test("resource request with valid value and unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "2g"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 2, "G")))
+  }
+
+  test("two resource requests with valid values and units") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1, CUSTOM_RES_2),
+  Map(CUSTOM_RES_1 -> "123m", CUSTOM_RES_2 -> "10G"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "m"),
+CUSTOM_RES_2 -> ResourceInformation(CUSTOM_RES_2, 10, "G")))
+  }
+
+  test("empty SparkConf should be valid") {
+val sparkConf = new SparkConf()
+ResourceRequestHelper.validateResources(sparkConf)
+  }
+
+  test("just normal resources are defined") {
+val sparkConf = new SparkConf()
+sparkConf.set(DRIVER_MEMORY.key, "3G")
+sparkConf.set(DRIVER_CORES.key, "4")
+sparkConf.set(EXECUTOR_MEMORY.key, "4G")
+sparkConf.set(EXECUTOR_CORES.key, "2")
+ResourceRequestHelper.validateResource

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-11 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224595178
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.util.Utils
+
+object ResourceRequestTestHelper {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("This method should not be invoked " 
+
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-11 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224594842
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  test("resource request value does not match pattern") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1),
+  Map(CUSTOM_RES_1 -> "**@#"), CUSTOM_RES_1)
+  }
+
+  test("resource request just unit defined") {
+verifySetResourceRequestsException(List(), Map(CUSTOM_RES_1 -> "m"), 
CUSTOM_RES_1)
+  }
+
+  test("resource request with null value should not be allowed") {
+verifySetResourceRequestsException(List(), null, Map(CUSTOM_RES_1 -> 
"123"),
+  "requirement failed: Resource parameter should not be null!")
+  }
+
+  test("resource request with valid value and invalid unit") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1), createResource,
+  Map(CUSTOM_RES_1 -> "123ppp"), "")
+  }
+
+  test("resource request with valid value and without unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "123"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "")))
+  }
+
+  test("resource request with valid value and unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "2g"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 2, "G")))
+  }
+
+  test("two resource requests with valid values and units") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1, CUSTOM_RES_2),
+  Map(CUSTOM_RES_1 -> "123m", CUSTOM_RES_2 -> "10G"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "m"),
+CUSTOM_RES_2 -> ResourceInformation(CUSTOM_RES_2, 10, "G")))
+  }
+
+  test("empty SparkConf should be valid") {
+val sparkConf = new SparkConf()
+ResourceRequestHelper.validateResources(sparkConf)
+  }
+
+  test("just normal resources are defined") {
+val sparkConf = new SparkConf()
+sparkConf.set(DRIVER_MEMORY.key, "3G")
+sparkConf.set(DRIVER_CORES.key, "4")
+sparkConf.set(EXECUTOR_MEMORY.key, "4G")
+sparkConf.set(EXECUTOR_CORES.key, "2")
+ResourceRequestHelper.validateResource

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-11 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224594296
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  test("resource request value does not match pattern") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1),
+  Map(CUSTOM_RES_1 -> "**@#"), CUSTOM_RES_1)
+  }
+
+  test("resource request just unit defined") {
+verifySetResourceRequestsException(List(), Map(CUSTOM_RES_1 -> "m"), 
CUSTOM_RES_1)
+  }
+
+  test("resource request with null value should not be allowed") {
+verifySetResourceRequestsException(List(), null, Map(CUSTOM_RES_1 -> 
"123"),
+  "requirement failed: Resource parameter should not be null!")
+  }
+
+  test("resource request with valid value and invalid unit") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1), createResource,
+  Map(CUSTOM_RES_1 -> "123ppp"), "")
+  }
+
+  test("resource request with valid value and without unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "123"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "")))
+  }
+
+  test("resource request with valid value and unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "2g"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 2, "G")))
+  }
+
+  test("two resource requests with valid values and units") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1, CUSTOM_RES_2),
+  Map(CUSTOM_RES_1 -> "123m", CUSTOM_RES_2 -> "10G"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "m"),
+CUSTOM_RES_2 -> ResourceInformation(CUSTOM_RES_2, 10, "G")))
+  }
+
+  test("empty SparkConf should be valid") {
+val sparkConf = new SparkConf()
+ResourceRequestHelper.validateResources(sparkConf)
+  }
+
+  test("just normal resources are defined") {
+val sparkConf = new SparkConf()
+sparkConf.set(DRIVER_MEMORY.key, "3G")
+sparkConf.set(DRIVER_CORES.key, "4")
+sparkConf.set(EXECUTOR_MEMORY.key, "4G")
+sparkConf.set(EXECUTOR_CORES.key, "2")
+ResourceRequestHelper.validateResource

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-11 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224594217
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  test("resource request value does not match pattern") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1),
+  Map(CUSTOM_RES_1 -> "**@#"), CUSTOM_RES_1)
+  }
+
+  test("resource request just unit defined") {
+verifySetResourceRequestsException(List(), Map(CUSTOM_RES_1 -> "m"), 
CUSTOM_RES_1)
+  }
+
+  test("resource request with null value should not be allowed") {
+verifySetResourceRequestsException(List(), null, Map(CUSTOM_RES_1 -> 
"123"),
+  "requirement failed: Resource parameter should not be null!")
+  }
+
+  test("resource request with valid value and invalid unit") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1), createResource,
+  Map(CUSTOM_RES_1 -> "123ppp"), "")
+  }
+
+  test("resource request with valid value and without unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "123"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "")))
+  }
+
+  test("resource request with valid value and unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "2g"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 2, "G")))
+  }
+
+  test("two resource requests with valid values and units") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1, CUSTOM_RES_2),
+  Map(CUSTOM_RES_1 -> "123m", CUSTOM_RES_2 -> "10G"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "m"),
+CUSTOM_RES_2 -> ResourceInformation(CUSTOM_RES_2, 10, "G")))
+  }
+
+  test("empty SparkConf should be valid") {
+val sparkConf = new SparkConf()
+ResourceRequestHelper.validateResources(sparkConf)
+  }
+
+  test("just normal resources are defined") {
+val sparkConf = new SparkConf()
+sparkConf.set(DRIVER_MEMORY.key, "3G")
+sparkConf.set(DRIVER_CORES.key, "4")
+sparkConf.set(EXECUTOR_MEMORY.key, "4G")
+sparkConf.set(EXECUTOR_CORES.key, "2")
+ResourceRequestHelper.validateResource

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-11 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224594094
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+
+  /**
+   * Validates sparkConf and throws a SparkException if any of standard 
resources (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+  (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+  (DRIVER_MEMORY.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"),
+  (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
+  (EXECUTOR_MEMORY.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"memory"),
+  (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cores"))
+val errorMessage = new mutable.StringBuilder()
+
+resourceDefinitions.foreach { case (sparkName, resourceRequest) =>
+  if (sparkConf.contains(resourceRequest)) {
+errorMessage.append(s"Error: Do not use $resourceRequest, " +
+s"please use $sparkName instead!\n")
+  }
+}
+
+if (errorMessage.nonEmpty) {
+  throw new SparkException(errorMessage.toString())
+}
+  }
+
+  /**
+   * Sets resource amount with the corresponding unit to the passed 
resource object.
+   * @param resources resource values to set
+   * @param resource resource object to update
+   */
+  def setResourceRequests(
+  resources: Map[String, String],
+  resource: Resource): Unit = {
+require(resource != null, "Resource parameter should not be null!")
+
+logDebug(s"Custom resources requested: $resources")
+if (!isYarnResourceTypesAvailable()) {
+  if (resources.nonEmpty) {
+logWarning("Ignoring custom resource requests because " +
+"the version of YARN does not support it!")
+  }
+  return
+}
+
+val resInfoClass = Utils.classForName(RESOURCE_INFO_CLASS)
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String], resInfoClass)
+resources.foreach { case (name, rawAmount) =>
+  try {
+val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
+val amount = amountPart.toLong
+val unit = unitPart match {
+  case "g" => "G"
+  case "t" => "T"
+  case "p" => "P"
+  case _ => unitPart
+   

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224245968
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -433,4 +442,39 @@ class ClientSuite extends SparkFunSuite with Matchers {
 classpath(env)
   }
 
+  private def testResourceRequest(expectedResources: Seq[(String, Long)],
--- End diff --

Fair enough. I decided to pass in the sparkConf, the list of known 
resources and the expected resources and their values, so the testcase is more 
clear now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224243624
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 ---
@@ -95,6 +97,12 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
   .set("spark.executor.instances", maxExecutors.toString)
   .set("spark.executor.cores", "5")
   .set("spark.executor.memory", "2048")
+
+// add additional configs from map
--- End diff --

removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224243499
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  test("resource request value does not match pattern") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1),
+  Map(CUSTOM_RES_1 -> "**@#"), CUSTOM_RES_1)
+  }
+
+  test("resource request just unit defined") {
+verifySetResourceRequestsException(List(), Map(CUSTOM_RES_1 -> "m"), 
CUSTOM_RES_1)
+  }
+
+  test("resource request with null value should not be allowed") {
+verifySetResourceRequestsExceptionWithNullRequest(List(), 
Map(CUSTOM_RES_1 -> "123"),
+  "requirement failed: Resource parameter should not be null!")
+  }
+
+  test("resource request with valid value and invalid unit") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "123ppp"))
+  }
+
+  test("resource request with valid value and without unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "123"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "")))
+  }
+
+  test("resource request with valid value and unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "2g"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 2, "G")))
+  }
+
+  test("two resource requests with valid values and units") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1, CUSTOM_RES_2),
+  Map(CUSTOM_RES_1 -> "123m", CUSTOM_RES_2 -> "10G"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "m"),
+CUSTOM_RES_2 -> ResourceInformation(CUSTOM_RES_2, 10, "G")))
+  }
+
+  test("empty SparkConf should be valid") {
+val sparkConf = new SparkConf()
+ResourceRequestHelper.validateResources(sparkConf)
+  }
+
+  test("just normal resources are defined") {
+val sparkConf = new SparkConf()
+sparkConf.set(DRIVER_MEMORY.key, "3G")
+sparkConf.set(DRIVER_CORES.key, "4")
+sparkConf.set(EXECUTOR_MEMORY.key, "4G")
+sparkConf.set(EXECUTOR_CORES.key, "2")
+ResourceRequestHelper.validateResources(sparkConf)
+  }

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224242115
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  test("resource request value does not match pattern") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1),
+  Map(CUSTOM_RES_1 -> "**@#"), CUSTOM_RES_1)
+  }
+
+  test("resource request just unit defined") {
+verifySetResourceRequestsException(List(), Map(CUSTOM_RES_1 -> "m"), 
CUSTOM_RES_1)
+  }
+
+  test("resource request with null value should not be allowed") {
+verifySetResourceRequestsExceptionWithNullRequest(List(), 
Map(CUSTOM_RES_1 -> "123"),
+  "requirement failed: Resource parameter should not be null!")
+  }
+
+  test("resource request with valid value and invalid unit") {
+verifySetResourceRequestsException(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "123ppp"))
+  }
+
+  test("resource request with valid value and without unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "123"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "")))
+  }
+
+  test("resource request with valid value and unit") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), 
Map(CUSTOM_RES_1 -> "2g"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 2, "G")))
+  }
+
+  test("two resource requests with valid values and units") {
+verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1, CUSTOM_RES_2),
+  Map(CUSTOM_RES_1 -> "123m", CUSTOM_RES_2 -> "10G"),
+  Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "m"),
+CUSTOM_RES_2 -> ResourceInformation(CUSTOM_RES_2, 10, "G")))
+  }
+
+  test("empty SparkConf should be valid") {
+val sparkConf = new SparkConf()
+ResourceRequestHelper.validateResources(sparkConf)
+  }
+
+  test("just normal resources are defined") {
+val sparkConf = new SparkConf()
+sparkConf.set(DRIVER_MEMORY.key, "3G")
+sparkConf.set(DRIVER_CORES.key, "4")
+sparkConf.set(EXECUTOR_MEMORY.key, "4G")
+sparkConf.set(EXECUTOR_CORES.key, "2")
+ResourceRequestHelper.validateResources(sparkConf)
+  }

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224241554
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224241720
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+  private val ERROR_PREFIX = "Error:"
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224195358
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -288,9 +296,14 @@ private[yarn] class YarnAllocator(
   s"executorsStarting: ${numExecutorsStarting.get}")
 
 if (missing > 0) {
-  logInfo(s"Will request $missing executor container(s), each with " +
-s"${resource.getVirtualCores} core(s) and " +
-s"${resource.getMemory} MB memory (including $memoryOverhead MB of 
overhead)")
+  var requestContainerMessage = s"Will request $missing executor 
container(s), each with " +
--- End diff --

Fair enough, added a condition to check whether it's enabled.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224194497
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,37 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("resource request for invalid resource") {
--- End diff --

Okay, I agree with that. Then I think just removal is fine here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224193837
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 ---
@@ -87,6 +88,20 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
   def createAllocator(
   maxExecutors: Int = 5,
   rmClient: AMRMClient[ContainerRequest] = rmClient): YarnAllocator = {
+createAllocatorInternal(maxExecutors, rmClient, Map())
+  }
+
+  def createAllocatorWithAdditionalConfigs(
--- End diff --

Fair enough, fixed.
Did similarly with the `createResource` method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224190144
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -433,4 +465,36 @@ class ClientSuite extends SparkFunSuite with Matchers {
 classpath(env)
   }
 
+  private def testResourceRequest(expectedResources: Seq[(String, Long)],
--- End diff --

Good point, fixed!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224187532
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,37 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("resource request for invalid resource") {
--- End diff --

Yes, this is very YARN-specific unlike the other testcases that verifies 
the resources in the app context.
Removed this testcase.
Do you think of any other way to test the scenario when a resource is 
specified in sparkConf which is not known for YARN? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224185444
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if any of standard 
resources (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+  (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+  (DRIVER_MEMORY.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"),
+  (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
+  (EXECUTOR_MEMORY.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"memory"),
+  (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cores"))
+val errorMessage = new mutable.StringBuilder()
+
+resourceDefinitions.foreach { case (sparkName, resourceRequest) =>
+  if (sparkConf.contains(resourceRequest)) {
+errorMessage.append(s"$ERROR_PREFIX Do not use $resourceRequest, " 
+
+s"please use $sparkName instead!\n")
+  }
+}
+
+if (errorMessage.nonEmpty) {
+  throw new SparkException(errorMessage.toString())
+}
+  }
+
+  /**
+   * Sets resource amount with the corresponding unit to the passed 
resource object.
+   * @param resources resource values to set
+   * @param resource resource object to update
+   */
+  def setResourceRequests(
+  resources: Map[String, String],
+  resource: Resource): Unit = {
+require(resource != null, "Resource parameter should not be null!")
+require(!resources.contains("memory"),
--- End diff --

That's true, removed these require checks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224179095
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedUnmatchedErrorMessage(name: String, value: 
String): String = {
+s"Resource request for '$name' ('$value') does not match pattern 
([0-9]+)([A-Za-z]*)."
+  }
+
+  test("resource type value does not match pattern") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "**@#")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "**@#"))
+  }
+
+  test("resource type just unit defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "m")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "m"))
+  }
+
+  test("resource type with null value should not be allowed") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "123")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, null)
+}
+thrown.getMessage should equal ("requirement failed: Resource 
parameter should not be null!")
+  }
+
+  test("resource type with valid value and invalid unit") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> 

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224178817
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedUnmatchedErrorMessage(name: String, value: 
String): String = {
+s"Resource request for '$name' ('$value') does not match pattern 
([0-9]+)([A-Za-z]*)."
+  }
+
+  test("resource type value does not match pattern") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "**@#")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "**@#"))
+  }
+
+  test("resource type just unit defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "m")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "m"))
+  }
+
+  test("resource type with null value should not be allowed") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "123")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, null)
+}
+thrown.getMessage should equal ("requirement failed: Resource 
parameter should not be null!")
+  }
+
+  test("resource type with valid value and invalid unit") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> 

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224178252
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedUnmatchedErrorMessage(name: String, value: 
String): String = {
+s"Resource request for '$name' ('$value') does not match pattern 
([0-9]+)([A-Za-z]*)."
+  }
+
+  test("resource type value does not match pattern") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "**@#")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "**@#"))
+  }
+
+  test("resource type just unit defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "m")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "m"))
+  }
+
+  test("resource type with null value should not be allowed") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "123")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, null)
+}
+thrown.getMessage should equal ("requirement failed: Resource 
parameter should not be null!")
+  }
+
+  test("resource type with valid value and invalid unit") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> 

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-10 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r224177875
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedUnmatchedErrorMessage(name: String, value: 
String): String = {
+s"Resource request for '$name' ('$value') does not match pattern 
([0-9]+)([A-Za-z]*)."
+  }
+
+  test("resource type value does not match pattern") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "**@#")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "**@#"))
+  }
+
+  test("resource type just unit defined") {
--- End diff --

Good idea. Extracted the common parts of all test methods in this class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223929329
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedUnmatchedErrorMessage(name: String, value: 
String): String = {
--- End diff --

Removed the method. The testcases only check whether the resource name is 
in the message from now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223928827
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 ---
@@ -134,6 +163,29 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 size should be (0)
   }
 
+  test("custom resource requested from yarn") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List("gpu"))
+
+val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
+val handler = createAllocatorWithAdditionalConfigs(1, Map(
+  YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G"), mockAmClient)
+
+handler.updateResourceRequests()
+val container = createContainerWithResource("host1", handler.resource)
+handler.handleAllocatedContainers(Array(container))
+
+// get amount of memory and vcores from resource, so effectively 
skipping their validation
+val expectedResources = 
Resource.newInstance(handler.resource.getMemory(),
+  handler.resource.getVirtualCores)
+ResourceRequestHelper.setResourceRequests(Map("gpu" -> "2G"), 
expectedResources)
+val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
+
+verify(mockAmClient).addContainerRequest(captor.capture())
+val containerRequest: ContainerRequest = captor.getValue
+assert(containerRequest.getCapability == expectedResources)
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223918822
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -288,9 +296,14 @@ private[yarn] class YarnAllocator(
   s"executorsStarting: ${numExecutorsStarting.get}")
 
 if (missing > 0) {
-  logInfo(s"Will request $missing executor container(s), each with " +
-s"${resource.getVirtualCores} core(s) and " +
-s"${resource.getMemory} MB memory (including $memoryOverhead MB of 
overhead)")
+  var requestContainerMessage = s"Will request $missing executor 
container(s), each with " +
--- End diff --

`lofInfo()` already calls `if (log.isInfoEnabled())`: 

`protected def logInfo(msg: => String) {
if (log.isInfoEnabled) log.info(msg)
  }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223918256
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.util.Utils
+
+object ResourceRequestTestHelper {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+allResourceTypes ++= defaultResourceTypes
+allResourceTypes ++= customResourceTypes
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
+val resourceUtilsClass =
+  
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
+  classOf[java.util.List[AnyRef]])
+reinitializeResourcesMethod.invoke(null, resourceTypes.asJava)
+  }
+
+  def getResourceTypeValue(res: Resource, name: String): AnyRef = {
+val resourceInformation: AnyRef = getResourceInformation(res, name)
--- End diff --

removed type


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223918024
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.util.Utils
+
+object ResourceRequestTestHelper {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+allResourceTypes ++= defaultResourceTypes
+allResourceTypes ++= customResourceTypes
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
--- End diff --

Inlined the method


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223918077
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedUnmatchedErrorMessage(name: String, value: 
String): String = {
+s"Resource request for '$name' ('$value') does not match pattern 
([0-9]+)([A-Za-z]*)."
+  }
+
+  test("resource type value does not match pattern") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "**@#")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "**@#"))
+  }
+
+  test("resource type just unit defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "m")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "m"))
+  }
+
+  test("resource type with null value should not be allowed") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "123")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceRequestHelper.setResourceRequests(resourceTypes, null)
+}
+thrown.getMessage should equal ("requirement failed: Resource 
parameter should not be null!")
+  }
+
+  test("resource type with valid value and invalid unit") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ResourceRequestTestHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> 

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223917264
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedUnmatchedErrorMessage(name: String, value: 
String): String = {
+s"Resource request for '$name' ('$value') does not match pattern 
([0-9]+)([A-Za-z]*)."
+  }
+
+  test("resource type value does not match pattern") {
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223916534
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+
+class ResourceRequestHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+  private val MEMORY = "memory"
+  private val CORES = "cores"
+  private val NEW_CONFIG_EXECUTOR_MEMORY = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_EXECUTOR_CORES = 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
+  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ MEMORY
+  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX 
+ CORES
+
+  override def beforeAll(): Unit = {
--- End diff --

removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223916183
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223915996
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -35,7 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
 import org.mockito.Matchers.{eq => meq, _}
 import org.mockito.Mockito._
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfterAll, Matchers}
--- End diff --

yep, it was unused, removed it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223915882
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if any of standard 
resources (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+  (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+  (DRIVER_MEMORY.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"),
+  (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
+  (EXECUTOR_MEMORY.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"memory"),
+  (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cores"))
+val errorMessage = new mutable.StringBuilder()
+
+resourceDefinitions.foreach { case (sparkName, resourceRequest) =>
+  if (sparkConf.contains(resourceRequest)) {
+errorMessage.append(s"$ERROR_PREFIX Do not use $resourceRequest, " 
+
+s"please use $sparkName instead!\n")
+  }
+}
+
+if (errorMessage.nonEmpty) {
+  throw new SparkException(errorMessage.toString())
+}
+  }
+
+  /**
+   * Sets resource amount with the corresponding unit to the passed 
resource object.
+   * @param resources resource values to set
+   * @param resource resource object to update
+   */
+  def setResourceRequests(
+  resources: Map[String, String],
+  resource: Resource): Unit = {
+require(resource != null, "Resource parameter should not be null!")
+require(!resources.contains("memory"),
+  "This method is meant to set custom resources to the resource 
object, " +
+  "but memory is a standard resource!")
+require(!resources.contains("cores"),
+  "This method is meant to set custom resources to the resource 
object, " +
+  "but cores is a standard resource!")
+
+logDebug(s"Custom resources requested: $resources")
+if (!isYarnResourceTypesAvailable()) {
+  if (resources.nonEmpty) {
+logWarning("Ignoring updating resource with resource types because 
" +
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223915417
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+  private val ERROR_PREFIX: String = "Error:"
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-09 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223915451
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config.{AM_CORES, AM_MEMORY, 
DRIVER_CORES, EXECUTOR_CORES, YARN_AM_RESOURCE_TYPES_PREFIX, 
YARN_DRIVER_RESOURCE_TYPES_PREFIX, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if any of standard 
resources (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   * @param sparkConf
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-05 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223159523
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 ---
@@ -134,6 +166,42 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 size should be (0)
   }
 
+  test("custom resource type requested from yarn") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu"))
+
+// request a single container and receive it
+val handler = createAllocatorWithAdditionalConfigs(1, Map(
+  YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G",
+  YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory" -> "1G"
+))
+handler.updateResourceRequests()
+handler.getNumExecutorsRunning should be (0)
+handler.getPendingAllocate.size should be (1)
+
+val resource = Resource.newInstance(3072, 6)
+val resourceTypes = Map("gpu" -> "2G")
+ResourceRequestHelper.setResourceRequests(resourceTypes, resource)
+
+val container = createContainerWithResource("host1", resource)
+handler.handleAllocatedContainers(Array(container))
+
+// verify custom resource type is part of rmClient.ask set
+val askField = rmClient.getClass.getDeclaredField("ask")
--- End diff --

Good point.
I found a better way to test what I wanted and it also covers verifying 
whether the `resource` field is set correctly.
Please check the new testcase's implementation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-05 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223150095
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceRequestHelper.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceRequestHelper extends Logging {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+allResourceTypes ++= defaultResourceTypes
+allResourceTypes ++= customResourceTypes
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
+val resourceUtilsClass =
+  
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
+  classOf[java.util.List[AnyRef]])
+reinitializeResourcesMethod.invoke(null, resourceTypes.asJava)
+  }
+
+  def getResourceTypeValue(res: Resource, name: String): AnyRef = {
+val resourceInformation: AnyRef = getResourceInformation(res, name)
+invokeMethod(resourceInformation, "getValue")
+  }
+
+  def getResourceInformationByName(res: Resource, nameParam: String): 
ResourceInformation = {
+val resourceInformation: AnyRef = getResourceInformation(res, 
nameParam)
+val name = invokeMethod(resourceInformation, 
"getName").asInstanceOf[String]
+val value = invokeMethod(resourceInformation, 
"getValue").asInstanceOf[Long]
+val units = invokeMethod(resourceInformation, 
"getUnits").asInstanceOf[String]
+new ResourceInformation(name, value, units)
+  }
+
+  private def getResourceInformation(res: Resource, name: String) = {
--- End diff --

Fixed with AnyRef


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-05 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223147289
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestValidatorSuite.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+class ResourceRequestValidatorSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  test("empty SparkConf should be valid") {
+val sparkConf = new SparkConf()
+ResourceRequestValidator.validateResources(sparkConf)
+  }
+
+  test("just normal resources are defined") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.driver.memory", "3G")
+sparkConf.set("spark.driver.cores", "4")
+sparkConf.set("spark.executor.memory", "4G")
+sparkConf.set("spark.executor.cores", "2")
+
+ResourceRequestValidator.validateResources(sparkConf)
+  }
+
+  test("Memory defined with new config for executor") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.executor.resource.memory", "30G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include 
("spark.yarn.executor.resource.memory")
+  }
+
+  test("Cores defined with new config for executor") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.executor.resource.cores", "5")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.executor.resource.cores")
+  }
+
+  test("Memory defined with new config, client mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.am.resource.memory", "1G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.am.resource.memory")
+  }
+
+  test("Memory defined with new config for driver, cluster mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.driver.resource.memory", "1G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.driver.resource.memory")
+  }
+
+  test("Cores defined with new config, client mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.driver.memory", "2G")
+sparkConf.set("spark.driver.cores", "4")
+sparkConf.set("spark.executor.memory", "4G")
+sparkConf.set("spark.yarn.am.cores", "2")
+
+sparkConf.set("spark.yarn.am.resource.cores", "3")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.am.resource.cores")
+  }
+
+  test("Cores defined with new config for driver, cluster mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.driver.resource.cores", "1G")
+
+val thrown 

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-05 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223142574
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceTypeHelper.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.junit.Assert
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceTypeHelper extends Logging {
--- End diff --

Renamed as you suggested in a later comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-05 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223142490
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceTypeValidatorSuite.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+
+class ResourceTypeValidatorSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedErrorMessage(
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-05 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223139838
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.util.Utils
+
+object ResourceRequestTestHelper {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+allResourceTypes ++= defaultResourceTypes
+allResourceTypes ++= customResourceTypes
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
+val resourceUtilsClass =
+  
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
+  classOf[java.util.List[AnyRef]])
+reinitializeResourcesMethod.invoke(null, resourceTypes.asJava)
+  }
+
+  def getResourceTypeValue(res: Resource, name: String): AnyRef = {
+val resourceInformation: AnyRef = getResourceInformation(res, name)
+invokeMethod(resourceInformation, "getValue")
+  }
+
+  def getResourceInformationByName(res: Resource, nameParam: String): 
ResourceInformation = {
+val resourceInformation: AnyRef = getResourceInformation(res, 
nameParam)
+val name = invokeMethod(resourceInformation, 
"getName").asInstanceOf[String]
+val value = invokeMethod(resourceInformation, 
"getValue").asInstanceOf[Long]
+val units = invokeMethod(resourceInformation, 
"getUnits").asInstanceOf[String]
+ResourceInformation(name, value, units)
+  }
+
+  private def getResourceInformation(res: Resource, name: String) = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("assertResourceTypeValue() should 
not be invoked " +
+"since yarn resource types is not available because of old Hadoop 
version!")
+}
+
+val getResourceInformationMethod = 
res.getClass.getMethod("getResourceInformation",
+  classOf[String])
+var yarnResourceName = name
+if (name == "memory") {
+  yarnResourceName = "memory-mb"
+}
+val resourceInformation = getResourceInformationMethod.invoke(res, 
yarnResourceName)
+resourceInformation
+  }
+
+  private def invokeMethod(resourceInformation: AnyRef, methodName: 
String): AnyRef = {
+val getValueMethod = resourceInformation.getClass.getMethod(methodName)
+getValueMethod.invoke(resourceInformation)
+  }
+
+  case class ResourceInformation(name

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-05 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223139747
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 ---
@@ -63,6 +63,10 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 
   var containerNum = 0
 
+  override def beforeAll(): Unit = {
--- End diff --

removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-05 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223139279
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[yarn] class ResourceTypeHelper {
+}
+
+object ResourceTypeHelper extends Logging {
+  private val resourceTypesNotAvailableErrorMessage =
+"Ignoring updating resource with resource types because " +
+"the version of YARN does not support it!"
+
+  def setResourceInfoFromResourceTypes(resourceTypesParam: Map[String, 
String],
+   resource: Resource): Resource = {
+if (resource == null) {
+  throw new IllegalArgumentException("Resource parameter should not be 
null!")
+}
+
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  logWarning(resourceTypesNotAvailableErrorMessage)
+  return resource
+}
+
+val resourceTypes = resourceTypesParam.map { case (k, v) => (
+  if (k.equals("memory")) {
--- End diff --

This was fixed before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-05 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r223139202
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+
+try {
+  client.createApplicationSubmissionContext(
+new YarnClientApplication(getNewApplicationResponse, appContext),
+containerLaunchContext)
+} catch {
+  case NonFatal(e) =>
+val expectedExceptionClass = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
+if (e.getClass.getName != expectedExceptionClass) {
+  fail(s"Exception caught: $e is not an instance of 
$expectedExceptionClass!")
+}
+}
+  }
+
+  test("Resource type args propagate (client mode)") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu", 
"fpga"))
+
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
--- End diff --

Nevermind what I commented about memory-mb. Fixed with latest commit pushed 
moments ago. The translation was only required because the test was wrong. We 
agreed on before that we don't need any translation in spark code from memory 
to memory-mb as memory is a standard resource and should be used as it was used 
to, with the standard spark config.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222897804
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 ---
@@ -134,6 +166,42 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 size should be (0)
   }
 
+  test("custom resource type requested from yarn") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu"))
+
+// request a single container and receive it
+val handler = createAllocatorWithAdditionalConfigs(1, Map(
+  YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G",
+  YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory" -> "1G"
+))
+handler.updateResourceRequests()
+handler.getNumExecutorsRunning should be (0)
+handler.getPendingAllocate.size should be (1)
+
+val resource = Resource.newInstance(3072, 6)
+val resourceTypes = Map("gpu" -> "2G")
+ResourceRequestHelper.setResourceRequests(resourceTypes, resource)
+
+val container = createContainerWithResource("host1", resource)
+handler.handleAllocatedContainers(Array(container))
+
+// verify custom resource type is part of rmClient.ask set
+val askField = rmClient.getClass.getDeclaredField("ask")
--- End diff --

Will look into this tomorrow morning


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222895698
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+
+try {
+  client.createApplicationSubmissionContext(
+new YarnClientApplication(getNewApplicationResponse, appContext),
+containerLaunchContext)
+} catch {
+  case NonFatal(e) =>
+val expectedExceptionClass = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
+if (e.getClass.getName != expectedExceptionClass) {
+  fail(s"Exception caught: $e is not an instance of 
$expectedExceptionClass!")
+}
+}
+  }
+
+  test("Resource type args propagate (client mode)") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu", 
"fpga"))
+
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+  .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + 
"some_resource_with_units_1", "122m")
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "fpga", "222m")
+  .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "fpga", "223m")
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "memory", "1G")
+  .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory", "2G")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+client.createApplicationSubmissionContext(
+  new YarnClientApplication(getNewApplicationResponse, appContext),
+  containerLaunchContext)
+
+appContext.getAMContainerSpec should be (containerLaunchContext)
+appContext.getApplicationType should be ("SPARK")
+
TestYarnResourceRequestHelper.getResourceTypeValue(appContext.getResource,
+  "some_resource_with_units_1") should be (121)
+TestYarnResourceRequestHelper
+.getResourceTypeValue(appContext.getResource, "fpga") should be 
(222)
+  }
+
+  test("configuration and resource type args propagate (cluster mode)") {
--- End diff --

Indeed, extracted a common method, passing the expected values as a 
sequence of tuples.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222894767
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+
+try {
+  client.createApplicationSubmissionContext(
+new YarnClientApplication(getNewApplicationResponse, appContext),
+containerLaunchContext)
+} catch {
+  case NonFatal(e) =>
+val expectedExceptionClass = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
+if (e.getClass.getName != expectedExceptionClass) {
+  fail(s"Exception caught: $e is not an instance of 
$expectedExceptionClass!")
+}
+}
+  }
+
+  test("Resource type args propagate (client mode)") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu", 
"fpga"))
+
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+  .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + 
"some_resource_with_units_1", "122m")
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "fpga", "222m")
+  .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "fpga", "223m")
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "memory", "1G")
--- End diff --

exactly, see my comment for the other testcase.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222894728
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+
+try {
+  client.createApplicationSubmissionContext(
+new YarnClientApplication(getNewApplicationResponse, appContext),
+containerLaunchContext)
+} catch {
+  case NonFatal(e) =>
+val expectedExceptionClass = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
+if (e.getClass.getName != expectedExceptionClass) {
+  fail(s"Exception caught: $e is not an instance of 
$expectedExceptionClass!")
+}
+}
+  }
+
+  test("Resource type args propagate (client mode)") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+TestYarnResourceRequestHelper.initializeResourceTypes(List("gpu", 
"fpga"))
+
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
--- End diff --

Actually, these two tests were indeed failing. Forgot to run them lately 
and jenkins does not run them because the Hadoop version is not > 3.1
This also pointed to another issue: in 
`ResourceRequestHelper.setResourceRequests`, the `setResourceInformationMethod` 
was invoked with value `"memory` which is not accepted by the YARN API so I had 
to translate it to `"memory-mb"` in order to work correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222890990
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceRequests(
--- End diff --

added doc


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222890547
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceRequestHelper.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceRequestHelper extends Logging {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+allResourceTypes ++= defaultResourceTypes
+allResourceTypes ++= customResourceTypes
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
+val resourceUtilsClass =
+  
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
+  classOf[java.util.List[AnyRef]])
+reinitializeResourcesMethod.invoke(null, resourceTypes.asJava)
+  }
+
+  def getResourceTypeValue(res: Resource, name: String): AnyRef = {
+val resourceInformation: AnyRef = getResourceInformation(res, name)
+invokeMethod(resourceInformation, "getValue")
+  }
+
+  def getResourceInformationByName(res: Resource, nameParam: String): 
ResourceInformation = {
+val resourceInformation: AnyRef = getResourceInformation(res, 
nameParam)
+val name = invokeMethod(resourceInformation, 
"getName").asInstanceOf[String]
+val value = invokeMethod(resourceInformation, 
"getValue").asInstanceOf[Long]
+val units = invokeMethod(resourceInformation, 
"getUnits").asInstanceOf[String]
+new ResourceInformation(name, value, units)
+  }
+
+  private def getResourceInformation(res: Resource, name: String) = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("assertResourceTypeValue() should 
not be invoked " +
+"since yarn resource types is not available because of old Hadoop 
version!")
+}
+
+val getResourceInformationMethod = 
res.getClass.getMethod("getResourceInformation",
+  classOf[String])
+val resourceInformation = getResourceInformationMethod.invoke(res, 
name)
+resourceInformation
+  }
+
+  private def invokeMethod(resourceInformation: AnyRef, methodName: 
String): AnyRef = {
+val getValueMethod = resourceInformation.getClass.getMethod(methodName)
+getValueMethod.invoke(resourceInformation)
+  }
+
+  class ResourceInformation(val name: String, val value: Long, val units: 
String) {
--- End diff --

Than

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222889602
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestValidatorSuite.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+class ResourceRequestValidatorSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  test("empty SparkConf should be valid") {
+val sparkConf = new SparkConf()
+ResourceRequestValidator.validateResources(sparkConf)
+  }
+
+  test("just normal resources are defined") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.driver.memory", "3G")
+sparkConf.set("spark.driver.cores", "4")
+sparkConf.set("spark.executor.memory", "4G")
+sparkConf.set("spark.executor.cores", "2")
+
+ResourceRequestValidator.validateResources(sparkConf)
+  }
+
+  test("Memory defined with new config for executor") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.executor.resource.memory", "30G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include 
("spark.yarn.executor.resource.memory")
+  }
+
+  test("Cores defined with new config for executor") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.executor.resource.cores", "5")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.executor.resource.cores")
+  }
+
+  test("Memory defined with new config, client mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.am.resource.memory", "1G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.am.resource.memory")
+  }
+
+  test("Memory defined with new config for driver, cluster mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.driver.resource.memory", "1G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.driver.resource.memory")
+  }
+
+  test("Cores defined with new config, client mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.driver.memory", "2G")
+sparkConf.set("spark.driver.cores", "4")
+sparkConf.set("spark.executor.memory", "4G")
+sparkConf.set("spark.yarn.am.cores", "2")
+
+sparkConf.set("spark.yarn.am.resource.cores", "3")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateResources(sparkConf)
+}
+thrown.getMessage should include ("spark.yarn.am.resource.cores")
+  }
+
+  test("Cores defined with new config for driver, cluster mode") {
+val sparkConf = new SparkConf()
+sparkConf.set("spark.yarn.driver.resource.cores", "1G")
+
+val thrown = intercept[SparkException] {
+  ResourceRequestValidator.validateRes

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222889144
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceRequestHelper.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceRequestHelper extends Logging {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+allResourceTypes ++= defaultResourceTypes
+allResourceTypes ++= customResourceTypes
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
+val resourceUtilsClass =
+  
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
+  classOf[java.util.List[AnyRef]])
+reinitializeResourcesMethod.invoke(null, resourceTypes.asJava)
+  }
+
+  def getResourceTypeValue(res: Resource, name: String): AnyRef = {
+val resourceInformation: AnyRef = getResourceInformation(res, name)
+invokeMethod(resourceInformation, "getValue")
+  }
+
+  def getResourceInformationByName(res: Resource, nameParam: String): 
ResourceInformation = {
+val resourceInformation: AnyRef = getResourceInformation(res, 
nameParam)
+val name = invokeMethod(resourceInformation, 
"getName").asInstanceOf[String]
+val value = invokeMethod(resourceInformation, 
"getValue").asInstanceOf[Long]
+val units = invokeMethod(resourceInformation, 
"getUnits").asInstanceOf[String]
+new ResourceInformation(name, value, units)
+  }
+
+  private def getResourceInformation(res: Resource, name: String) = {
--- End diff --

In what way I could refer to the `ResourceInformation` class?
Remember, that class either present or not present on the classpath 
depending on what version of Hadoop Spark depends on.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222888986
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceRequestHelper.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceRequestHelper extends Logging {
--- End diff --

Fixed the name and removed Logging.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222888518
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
+assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+val sparkConf = new SparkConf()
+  .set(YARN_AM_RESOURCE_TYPES_PREFIX + "some_resource_with_units_1", 
"121m")
+val args = new ClientArguments(Array())
+
+val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+val client = new Client(args, sparkConf)
+
+try {
+  client.createApplicationSubmissionContext(
+new YarnClientApplication(getNewApplicationResponse, appContext),
+containerLaunchContext)
+} catch {
+  case NonFatal(e) =>
+val expectedExceptionClass = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
+if (e.getClass.getName != expectedExceptionClass) {
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222888391
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -199,6 +200,92 @@ class ClientSuite extends SparkFunSuite with Matchers {
 appContext.getMaxAppAttempts should be (42)
   }
 
+  test("Resource type args propagate, resource type not defined") {
--- End diff --

fixed the namings in this class


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222887537
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -35,13 +36,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
 import org.mockito.Matchers.{eq => meq, _}
 import org.mockito.Mockito._
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfterAll, Matchers}
 
 import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
-class ClientSuite extends SparkFunSuite with Matchers {
+class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
--- End diff --

I guess I wanted to use it before then I forgot to remote this later. Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222887404
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceRequestValidator {
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+  (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+  ("spark.driver.memory", YARN_DRIVER_RESOURCE_TYPES_PREFIX + 
"memory"),
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222887435
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceRequestValidator {
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val resourceDefinitions = Seq[(String, String)](
+  (AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+  (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+  ("spark.driver.memory", YARN_DRIVER_RESOURCE_TYPES_PREFIX + 
"memory"),
+  (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
+  ("spark.executor.memory", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"memory"),
+  (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cores"))
+val errorMessage = new mutable.StringBuilder()
+
+resourceDefinitions.foreach { case (sparkName, resourceRequest) =>
+  if (sparkConf.contains(resourceRequest)) {
+errorMessage.append(s"$ERROR_PREFIX Do not use $resourceRequest, " 
+
+s"please use $sparkName instead!\n")
+  }
+}
+
+// throw exception after loop
--- End diff --

removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222886705
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceRequestValidator {
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222886619
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceRequestValidator {
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222886571
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceRequests(
+  resources: Map[String, String],
+  resource: Resource): Unit = {
+require(resource != null, "Resource parameter should not be null!")
+
+logDebug(s"Custom resource types requested: $resources")
+if (!isYarnResourceTypesAvailable()) {
+  if (resources.nonEmpty) {
+logWarning("Ignoring updating resource with resource types because 
" +
+"the version of YARN does not support it!")
+  }
+  return
+}
+
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+resources.foreach { case (name, rawAmount) =>
+  try {
+val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
+val amount = amountPart.toLong
+val unit = unitPart match {
+  case "g" => "G"
+  case "t" => "T"
+  case "p" => "P"
+  case _ => unitPart
+}
+logDebug(s"Registering resource with name: $name, amount: $amount, 
unit: $unit")
+val resourceInformation =
+  createResourceInformation(name, amount, unit, resInfoClass)
+setResourceInformationMethod.invoke(
+  resource, name, resourceInformation.asInstanceOf[AnyRef])
+  } catch {
+case _: MatchError => throw new IllegalArgumentException(
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222809051
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestValidator.scala
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceRequestValidator {
+  private val ERROR_PREFIX: String = "Error:"
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222808823
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceRequests(
+  resources: Map[String, String],
+  resource: Resource): Unit = {
+require(resource != null, "Resource parameter should not be null!")
+
+logDebug(s"Custom resource types requested: $resources")
+if (!isYarnResourceTypesAvailable()) {
+  if (resources.nonEmpty) {
+logWarning("Ignoring updating resource with resource types because 
" +
+"the version of YARN does not support it!")
+  }
+  return
+}
+
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+resources.foreach { case (name, rawAmount) =>
+  try {
+val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
+val amount = amountPart.toLong
+val unit = unitPart match {
+  case "g" => "G"
+  case "t" => "T"
+  case "p" => "P"
+  case _ => unitPart
+}
+logDebug(s"Registering resource with name: $name, amount: $amount, 
unit: $unit")
+val resourceInformation =
+  createResourceInformation(name, amount, unit, resInfoClass)
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222808683
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceRequests(
+  resources: Map[String, String],
+  resource: Resource): Unit = {
+require(resource != null, "Resource parameter should not be null!")
+
+logDebug(s"Custom resource types requested: $resources")
+if (!isYarnResourceTypesAvailable()) {
+  if (resources.nonEmpty) {
+logWarning("Ignoring updating resource with resource types because 
" +
+"the version of YARN does not support it!")
+  }
+  return
+}
+
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
--- End diff --

Indeed. Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-04 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r222808201
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.util.Try
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceRequestHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceRequests(
+  resources: Map[String, String],
+  resource: Resource): Unit = {
+require(resource != null, "Resource parameter should not be null!")
+
+logDebug(s"Custom resource types requested: $resources")
+if (!isYarnResourceTypesAvailable()) {
+  if (resources.nonEmpty) {
+logWarning("Ignoring updating resource with resource types because 
" +
+"the version of YARN does not support it!")
+  }
+  return
+}
+
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221821930
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceTypeValidator {
+  private val ERROR_PREFIX: String = "Error: "
+  private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[(String, String)](
+("spark.yarn.am.memory", YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+("spark.yarn.am.cores", YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+("spark.driver.memory", YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"),
+("spark.driver.cores", YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
+("spark.executor.memory", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"memory"),
+("spark.executor.cores", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"cores"))
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val overallErrorMessage = new mutable.StringBuilder()
+POSSIBLE_RESOURCE_DEFINITIONS.foreach { resdef =>
+  val customResources: Map[String, String] =
--- End diff --

Yeah, you are right, actually. 
Since we had many iterations and changes of this class this became clear 
for me finally that I can simplify this even more as you suggested. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221820245
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceTypeHelper.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.junit.Assert
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceTypeHelper extends Logging {
--- End diff --

This is intended to be a test helper class, used by many test classes.
Can you suggest any better name? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221818418
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypes: Map[String, String],
+  resource: Resource): Resource = {
+require(resource != null, "Resource parameter should not be null!")
+
+logDebug(s"Custom resource types requested: $resourceTypes")
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
--- End diff --

Oh I see, that makes sense :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221814183
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -140,10 +140,19 @@ private[yarn] class YarnAllocator(
   }
   // Number of cores per executor.
   protected val executorCores = sparkConf.get(EXECUTOR_CORES)
-  // Resource capability requested for each executors
-  private[yarn] val resource = Resource.newInstance(
-executorMemory + memoryOverhead + pysparkWorkerMemory,
-executorCores)
+
+  private val executorResourceTypes =
--- End diff --

Actually yes, fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221813983
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceTypeValidator {
+  private val ERROR_PREFIX: String = "Error: "
+  private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[(String, String)](
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221813856
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceTypeValidator {
+  private val ERROR_PREFIX: String = "Error: "
+  private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[(String, String)](
+("spark.yarn.am.memory", YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+("spark.yarn.am.cores", YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+("spark.driver.memory", YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"),
+("spark.driver.cores", YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
+("spark.executor.memory", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"memory"),
+("spark.executor.cores", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"cores"))
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val overallErrorMessage = new mutable.StringBuilder()
+POSSIBLE_RESOURCE_DEFINITIONS.foreach { resdef =>
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221812604
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceTypeValidator {
+  private val ERROR_PREFIX: String = "Error: "
+  private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[(String, String)](
+("spark.yarn.am.memory", YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221812022
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypes: Map[String, String],
+  resource: Resource): Resource = {
+require(resource != null, "Resource parameter should not be null!")
+
+logDebug(s"Custom resource types requested: $resourceTypes")
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  if (resourceTypes.nonEmpty) {
+logWarning("Ignoring updating resource with resource types because 
" +
+"the version of YARN does not support it!")
+  }
+  return resource
+}
+
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+resourceTypes.foreach { case (name, rawAmount) =>
+  try {
+val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
+val amount = amountPart.toLong
+val unit = unitPart match {
+  case "g" => "G"
+  case "t" => "T"
+  case "p" => "P"
+  case _ => unitPart
+}
+logDebug(s"Registering resource with name: $name, amount: $amount, 
unit: $unit")
+val resourceInformation =
+  createResourceInformation(name, amount, unit, resInfoClass)
+setResourceInformationMethod.invoke(resource, name,
+  resourceInformation.asInstanceOf[AnyRef])
+  } catch {
+case _: MatchError => throw new IllegalArgumentException(
+  s"Resource request for '$name' ('$rawAmount') " +
+  s"does not match pattern $AMOUNT_AND_UNIT_REGEX.")
+case e: InvocationTargetException if e.getCause != null => throw 
e.getCause
+  }
+}
+resource
+  }
+
+  private def createResourceInformation(
+  resourceName: String,
+  amount: Long,
+  unit: String,
+  resInfoClass: Class[_]): Any = {
+val resourceInformation =
+  if (unit.nonEmpty) {
+val resInfoNewInstanceMethod = 
resInfoClass.getMethod("newInstance",
+  classOf[String], classOf[String], JLong.TYPE)
+resInfoNewInstanceMethod.invoke(null, resourceName, unit, 
amount.asInstanceOf[JLong])
+  } else {
+val resInfoNewInstanceMethod = 
resInfoClass.getMethod("newInstance",
+  classOf[String], JLong.TYPE)
+resInfoNewInstanceMethod.invoke(null, resourceName, 
amount.asInstanceOf[JLong])
+  }
+resourceInformation
+  }
+
+  /**
+   * Checks whether Hadoop 2.x or 3 is used as a dependency.
+   * In case of Hadoop 3 and later,

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221811247
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceTypeHelper extends Logging {
--- End diff --

I agree, fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221811171
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceInfoFromResourceTypes(
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221811135
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypes: Map[String, String],
--- End diff --

I agree, fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221810924
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypes: Map[String, String],
+  resource: Resource): Resource = {
--- End diff --

Very good point, thanks! Fixed this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221810447
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypes: Map[String, String],
+  resource: Resource): Resource = {
+require(resource != null, "Resource parameter should not be null!")
+
+logDebug(s"Custom resource types requested: $resourceTypes")
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  if (resourceTypes.nonEmpty) {
+logWarning("Ignoring updating resource with resource types because 
" +
+"the version of YARN does not support it!")
+  }
+  return resource
+}
+
+val resInfoClass = Utils.classForName(
+  "org.apache.hadoop.yarn.api.records.ResourceInformation")
+val setResourceInformationMethod =
+  resource.getClass.getMethod("setResourceInformation", 
classOf[String],
+resInfoClass)
+resourceTypes.foreach { case (name, rawAmount) =>
+  try {
+val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
+val amount = amountPart.toLong
+val unit = unitPart match {
+  case "g" => "G"
+  case "t" => "T"
+  case "p" => "P"
+  case _ => unitPart
+}
+logDebug(s"Registering resource with name: $name, amount: $amount, 
unit: $unit")
+val resourceInformation =
+  createResourceInformation(name, amount, unit, resInfoClass)
+setResourceInformationMethod.invoke(resource, name,
+  resourceInformation.asInstanceOf[AnyRef])
+  } catch {
+case _: MatchError => throw new IllegalArgumentException(
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221810320
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -86,6 +86,13 @@ private[spark] class Client(
 sparkConf.get(AM_CORES)
   }
 
+  private val amResources =
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-10-01 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r221809062
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeHelper.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.reflect.InvocationTargetException
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.Resource
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This helper class uses some of Hadoop 3 methods from the YARN API,
+ * so we need to use reflection to avoid compile error when building 
against Hadoop 2.x
+ */
+private object ResourceTypeHelper extends Logging {
+  private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
+
+  def setResourceInfoFromResourceTypes(
+  resourceTypes: Map[String, String],
+  resource: Resource): Resource = {
+require(resource != null, "Resource parameter should not be null!")
+
+logDebug(s"Custom resource types requested: $resourceTypes")
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
--- End diff --

The purpose of this class is to hide the implentation details of the 
resource type related functionality.
For example, setResourceInfoFromResourceTypes is called from many places, 
so with the current implementation, code is not duplicated among classes.
What would be your suggestion instead of this class?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...

2018-09-19 Thread szyszy
Github user szyszy commented on the issue:

https://github.com/apache/spark/pull/20761
  
@srowen: Once again, thanks for the review.
I fixed most of what you suggested, if not then I made a comment.
Please check the updated code!
Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218766937
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceTypeHelper.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.junit.Assert
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceTypeHelper extends Logging {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+
+allResourceTypes.++=(defaultResourceTypes)
+allResourceTypes.++=(customResourceTypes)
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
+val resourceUtilsClass =
+  
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
+  classOf[java.util.List[AnyRef]])
+try {
+  reinitializeResourcesMethod.invoke(null, resourceTypes.asJava)
+} catch {
+  case e: Exception =>
+e.printStackTrace()
+Assert.fail("resource map initialization failed")
+}
+  }
+
+  def getResourceTypeValue(res: Resource, name: String): AnyRef = {
+val resourceInformation: AnyRef = getResourceInformation(res, name)
+invokeMethod(resourceInformation, "getValue")
+  }
+
+  def getResourceInformationByName(res: Resource, nameParam: String): 
ResourceInformation = {
+val resourceInformation: AnyRef = getResourceInformation(res, 
nameParam)
+val name = invokeMethod(resourceInformation, "getName")
+val value = invokeMethod(resourceInformation, "getValue")
+val units = invokeMethod(resourceInformation, "getUnits")
+new ResourceInformation(
+  name.asInstanceOf[String],
--- End diff --

Good point, fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218766068
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceTypeHelper.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.junit.Assert
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceTypeHelper extends Logging {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+
+allResourceTypes.++=(defaultResourceTypes)
+allResourceTypes.++=(customResourceTypes)
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
+val resourceUtilsClass =
+  
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
+  classOf[java.util.List[AnyRef]])
+try {
+  reinitializeResourcesMethod.invoke(null, resourceTypes.asJava)
+} catch {
+  case e: Exception =>
--- End diff --

Good point, fixed!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218765708
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceTypeHelper.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.junit.Assert
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceTypeHelper extends Logging {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+
+allResourceTypes.++=(defaultResourceTypes)
+allResourceTypes.++=(customResourceTypes)
+
+reinitializeResources(allResourceTypes)
+  }
+
+  private def createResourceTypeInfo(resourceName: String): AnyRef = {
+val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
+val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
+resTypeInfoNewInstanceMethod.invoke(null, resourceName)
+  }
+
+  private def reinitializeResources(resourceTypes: ListBuffer[AnyRef]): 
Unit = {
+val resourceUtilsClass =
+  
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
+val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
+  classOf[java.util.List[AnyRef]])
--- End diff --

As I remember, this was the only way to properly derefernece the method via 
reflection, so I would keep the type as it is now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218765124
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/TestYarnResourceTypeHelper.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.junit.Assert
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+object TestYarnResourceTypeHelper extends Logging {
+  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+if (!ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+  throw new IllegalStateException("initializeResourceTypes() should 
not be invoked " +
+"since YARN resource types is not available because of old Hadoop 
version!" )
+}
+
+val allResourceTypes = new ListBuffer[AnyRef]
+val defaultResourceTypes = List(
+  createResourceTypeInfo("memory-mb"),
+  createResourceTypeInfo("vcores"))
+val customResourceTypes = resourceTypes.map(rt => 
createResourceTypeInfo(rt))
+
+allResourceTypes.++=(defaultResourceTypes)
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218764502
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceTypeValidatorSuite.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+
--- End diff --

Removed line but kept one between the last import and the class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218763818
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceTypeValidatorSuite.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+
+class ResourceTypeValidatorSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedErrorMessage(
--- End diff --

I agree this way the tests are more fragile, but at least we know the whole 
message still looks good when someone changes the code in the future.
I would leave this as it is now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218762926
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceTypeHelperSuite.scala
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.scalatest.{BeforeAndAfterAll, Matchers}
+
+import org.apache.spark.SparkFunSuite
+import 
org.apache.spark.deploy.yarn.TestYarnResourceTypeHelper.ResourceInformation
+
+class ResourceTypeHelperSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private val CUSTOM_RES_1 = "custom-resource-type-1"
+  private val CUSTOM_RES_2 = "custom-resource-type-2"
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+  }
+
+  private def getExpectedUnmatchedErrorMessage(name: String, value: 
String): String = {
+s"Resource request for '$name' ('$value') does not match pattern 
([0-9]+)([A-Za-z]*)."
+  }
+
+  test("resource type value does not match pattern") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "**@#")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "**@#"))
+  }
+
+  test("resource type just unit defined") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "m")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, 
createAResource)
+}
+thrown.getMessage should equal 
(getExpectedUnmatchedErrorMessage(CUSTOM_RES_1, "m"))
+  }
+
+  test("resource type with null value should not be allowed") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List())
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "123")
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, 
null)
+}
+thrown.getMessage should equal ("requirement failed: Resource 
parameter should not be null!")
+  }
+
+  test("resource type with valid value and invalid unit") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "123ppp")
+val resource = createAResource
+
+val thrown = intercept[IllegalArgumentException] {
+  ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, 
resource)
+}
+thrown.getMessage should fullyMatch regex
+  """Unknown unit 'ppp'\. Known units are \[.*\]"""
+  }
+
+  test("resource type with valid value and without unit") {
+assume(ResourceTypeHelper.isYarnResourceTypesAvailable())
+TestYarnResourceTypeHelper.initializeResourceTypes(List(CUSTOM_RES_1))
+
+val resourceTypes = Map(CUSTOM_RES_1 -> "123")
+val resource = createAResource
+
+ResourceTypeHelper.setResourceInfoFromResourceTypes(resourceTypes, 
resource)
+val customResource: ResourceInformation = TestYarnResourceTypeHelper
+  .ge

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218762664
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -35,18 +36,22 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
 import org.mockito.Matchers.{eq => meq, _}
 import org.mockito.Mockito._
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfterAll, Matchers}
 
 import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
-class ClientSuite extends SparkFunSuite with Matchers {
+class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
 
   import Client._
 
   var oldSystemProperties: Properties = null
 
+  override def beforeAll(): Unit = {
+super.beforeAll()
--- End diff --

Good point, removed it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218762120
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -140,10 +140,19 @@ private[yarn] class YarnAllocator(
   }
   // Number of cores per executor.
   protected val executorCores = sparkConf.get(EXECUTOR_CORES)
-  // Resource capability requested for each executors
-  private[yarn] val resource = Resource.newInstance(
-executorMemory + memoryOverhead + pysparkWorkerMemory,
-executorCores)
+
+  private val executorResourceTypes: collection.immutable.Map[String, 
String] =
--- End diff --

Removed the type declaration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218761991
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceTypeValidator {
+  private val ERROR_PREFIX: String = "Error: "
+  private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[(String, String)](
+("spark.yarn.am.memory", YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+("spark.yarn.am.cores", YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+("spark.driver.memory", YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"),
+("spark.driver.cores", YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
+("spark.executor.memory", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"memory"),
+("spark.executor.cores", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"cores"))
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val overallErrorMessage = new mutable.StringBuilder()
+POSSIBLE_RESOURCE_DEFINITIONS.foreach { resdef =>
+  val customResources: Map[String, String] =
+getCustomResourceValuesForKey(sparkConf, resdef._1)
+
+  val errorMessage = getErrorMessage(customResources, resdef._1, 
resdef._2)
+  if (errorMessage.nonEmpty) {
+overallErrorMessage.append(s"$ERROR_PREFIX$errorMessage\n")
+  }
+}
+
+// throw exception after loop
+if (overallErrorMessage.nonEmpty) {
+  throw new SparkException(overallErrorMessage.toString())
+}
+  }
+
+  def getCustomResourceValuesForKey(sparkConf: SparkConf, key: String): 
Map[String, String] = {
+if (key contains "am") {
+  extractCustomResources(sparkConf, YARN_AM_RESOURCE_TYPES_PREFIX)
+} else if (key contains "driver") {
+  extractCustomResources(sparkConf, YARN_DRIVER_RESOURCE_TYPES_PREFIX)
+} else if (key contains "executor") {
+  extractCustomResources(sparkConf, 
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX)
+} else Map.empty[String, String]
+  }
+
+  def getErrorMessage(
+  customResources: Map[String, String],
+  standardResourceConfigKey: String,
+  customResourceConfigKey: String): String = {
+  if (customResources.contains(customResourceConfigKey)) {
+val idx = standardResourceConfigKey.lastIndexOf(".")
+val resourceType = standardResourceConfigKey.substring(idx + 1)
+s"$resourceType cannot be requested with config 
$customResourceConfigKey, " +
+s"please use config $standardResourceConfigKey i

[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-19 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218760548
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceTypeValidator.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+
+private object ResourceTypeValidator {
+  private val ERROR_PREFIX: String = "Error: "
+  private val POSSIBLE_RESOURCE_DEFINITIONS = Seq[(String, String)](
+("spark.yarn.am.memory", YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
+("spark.yarn.am.cores", YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
+("spark.driver.memory", YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"),
+("spark.driver.cores", YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
+("spark.executor.memory", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"memory"),
+("spark.executor.cores", YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + 
"cores"))
+
+  /**
+   * Validates sparkConf and throws a SparkException if a standard 
resource (memory or cores)
+   * is defined with the property spark.yarn.x.resource.y
+   *
+   * Example of an invalid config:
+   * - spark.yarn.driver.resource.memory=2g
+   *
+   * Please note that if multiple resources are defined like described 
above,
+   * the error messages will be concatenated.
+   * Example of such a config:
+   * - spark.yarn.driver.resource.memory=2g
+   * - spark.yarn.executor.resource.cores=2
+   * Then the following two error messages will be printed:
+   * - "memory cannot be requested with config 
spark.yarn.driver.resource.memory,
+   * please use config spark.driver.memory instead!
+   * - "cores cannot be requested with config 
spark.yarn.executor.resource.cores,
+   * please use config spark.executor.cores instead!
+   *
+   * @param sparkConf
+   */
+  def validateResources(sparkConf: SparkConf): Unit = {
+val overallErrorMessage = new mutable.StringBuilder()
+POSSIBLE_RESOURCE_DEFINITIONS.foreach { resdef =>
+  val customResources: Map[String, String] =
+getCustomResourceValuesForKey(sparkConf, resdef._1)
+
+  val errorMessage = getErrorMessage(customResources, resdef._1, 
resdef._2)
+  if (errorMessage.nonEmpty) {
+overallErrorMessage.append(s"$ERROR_PREFIX$errorMessage\n")
+  }
+}
+
+// throw exception after loop
+if (overallErrorMessage.nonEmpty) {
+  throw new SparkException(overallErrorMessage.toString())
+}
+  }
+
+  def getCustomResourceValuesForKey(sparkConf: SparkConf, key: String): 
Map[String, String] = {
+if (key contains "am") {
--- End diff --

Thanks for pointing this out.
Actually, I searched in the code before using this syntax and I realized it 
is used, but you are right, it's getting used in fewer places than the syntax 
you suggested.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20761: [SPARK-20327][CORE][YARN] Add CLI support for YARN custo...

2018-09-18 Thread szyszy
Github user szyszy commented on the issue:

https://github.com/apache/spark/pull/20761
  
Hi @squito, @srowen and @vanzin !
Thanks for the reviews so far!
Please see the updated code, I hope everything fixed that was commented.
@vanzin: Please also check the simplified ResourceTypeValidator, I think 
it's more concise and readable than the previous version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-17 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218087990
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -275,9 +287,13 @@ private[yarn] class YarnAllocator(
   s"executorsStarting: ${numExecutorsStarting.get}")
 
 if (missing > 0) {
-  logInfo(s"Will request $missing executor container(s), each with " +
-s"${resource.getVirtualCores} core(s) and " +
-s"${resource.getMemory} MB memory (including $memoryOverhead MB of 
overhead)")
+  var requestContainerMessage = s"Will request $missing executor 
container(s), each with " +
+  s"${resource.getVirtualCores} core(s) and " +
+  s"${resource.getMemory} MB memory (including $memoryOverhead MB 
of overhead)"
+  if (ResourceTypeHelper.isYarnResourceTypesAvailable()) {
+requestContainerMessage ++= s" with custom resources: " + 
resource.toString
--- End diff --

Yes, the nonEmpty check was missing. thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20761: [SPARK-20327][CORE][YARN] Add CLI support for YAR...

2018-09-17 Thread szyszy
Github user szyszy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20761#discussion_r218086888
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -35,18 +36,22 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
 import org.mockito.Matchers.{eq => meq, _}
 import org.mockito.Mockito._
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfterAll, Matchers}
 
 import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
-class ClientSuite extends SparkFunSuite with Matchers {
+class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
 
   import Client._
 
   var oldSystemProperties: Properties = null
 
+  override def beforeAll(): Unit = {
+super.beforeAll()
--- End diff --

I don't quite remember why I needed this, but it was related to the call to 
System.setProperty("spark.testing", "true") in SparkFunSuite.beforeAll.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >