tgravescs commented on a change in pull request #32804:
URL: https://github.com/apache/spark/pull/32804#discussion_r656210527



##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
##########
@@ -27,6 +27,24 @@ import org.apache.spark.network.util.ByteUnit
 package object config extends Logging {
 
   /* Common app configuration. */
+  private[spark] val SCHEDULING_REQUEST_ENABLED =
+    ConfigBuilder("spark.yarn.schedulingRequestEnabled")

Review comment:
       I think we need to  come up with better name  and more description.   
From just reading this, I have no idea what it does.  spark always requests 
containers from yarn to schedule on, so why is it off...(ie that is what I read 
from the config name)

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredSchedulingRequestContainerPlacementStrategy.scala
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.{ContainerId, SchedulingRequest}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.resource.ResourceProfile
+
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering

Review comment:
       this looks like a direct copy of the ContainerLocalityPreferences 
javadoc, I'm assuming this is different so please update description, is I 
missed the difference, I think we need to pull it up to the top to be clear how 
its different 

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredSchedulingRequestContainerPlacementStrategy.scala
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.{ContainerId, SchedulingRequest}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.resource.ResourceProfile
+
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers and 
locality of current
+ * existing and pending allocated containers. The target of this algorithm is 
to maximize the number
+ * of tasks that would run locally.
+ *
+ * Consider a situation in which we have 20 tasks that require (host1, host2, 
host3)
+ * and 10 tasks that require (host1, host2, host4), besides each container has 
2 cores
+ * and cpus per task is 1, so the required container number is 15,
+ * and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If the requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the required 
container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 
: 1)
+ *
+ * 3. If containers exist but none of them can match the requested localities,
+ * follow the method of 1 and 2.
+ *
+ * 4. If containers exist and some of them can match the requested localities.
+ * For example if we have 1 container on each node (host1: 1, host2: 1: host3: 
1, host4: 1),
+ * and the expected containers on each node would be (host1: 5, host2: 5, 
host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to (host1: 
4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with an updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is less than newly required 
containers (12). Follow
+ *   method 2 with an updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers exist and existing localities can fully cover the 
requested localities.
+ * For example if we have 5 containers on each node (host1: 5, host2: 5, 
host3: 5, host4: 5),
+ * which could cover the current requested localities. This algorithm will 
allocate all the
+ * requested containers with no localities.
+ */
+private[yarn] class 
LocalityPreferredSchedulingRequestContainerPlacementStrategy(
+    val sparkConf: SparkConf,
+    val yarnConf: Configuration,
+    resolver: SparkRackResolver) {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwareTasks number of locality required tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   *                             numbers running on it, used as hints for 
container allocation
+   * @param allocatedHostToContainersMap host to allocated containers map, 
used to calculate the
+   *                                     expected locality preference by 
considering the existing
+   *                                     containers
+   * @param localityMatchedPendingAllocations A sequence of pending container 
request which
+   *                                          matches the localities of 
current required tasks.
+   * @param rp The ResourceProfile associated with this container.
+   * @return node localities and rack localities, each locality is an array of 
string,
+   *         the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+      numContainer: Int,
+      numLocalityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int],
+      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
+      localityMatchedPendingAllocations: Seq[SchedulingRequest],
+      schedulingRequestToNodes: mutable.HashMap[Long, Array[String]],

Review comment:
       not documented




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to