Ngone51 closed pull request #23368: [SPARK-26269][YARN][BRANCH-2.4]
Yarnallocator should have same blacklist behaviour with yarn to maxmize use of
cluster resource
URL: https://github.com/apache/spark/pull/23368
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index f4dc80ad4a627..3357084146e87 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -578,13 +578,23 @@ private[yarn] class YarnAllocator(
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
- case _ =>
- // all the failures which not covered above, like:
- // disk failure, kill by app master or resource manager, ...
- allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
- (true, "Container marked as failed: " + containerId + onHostStr +
- ". Exit status: " + completedContainer.getExitStatus +
- ". Diagnostics: " + completedContainer.getDiagnostics)
+ case other_exit_status =>
+ // SPARK-26269: follow YARN's blacklisting behaviour(see
https://github
+ //
.com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had
+ //
oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap
+ // ache/hadoop/yarn/util/Apps.java#L273 for details)
+ if
(NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) {
+ (false, s"Container marked as failed: $containerId$onHostStr" +
+ s". Exit status: ${completedContainer.getExitStatus}" +
+ s". Diagnostics: ${completedContainer.getDiagnostics}.")
+ } else {
+ // completed container from a bad node
+
allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
+ (true, s"Container from a bad node: $containerId$onHostStr" +
+ s". Exit status: ${completedContainer.getExitStatus}" +
+ s". Diagnostics: ${completedContainer.getDiagnostics}.")
+ }
+
}
if (exitCausedByApp) {
@@ -722,4 +732,11 @@ private object YarnAllocator {
"Consider boosting spark.yarn.executor.memoryOverhead or " +
"disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714."
}
+ val NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS = Set(
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+ ContainerExitStatus.KILLED_BY_APPMASTER,
+ ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+ ContainerExitStatus.ABORTED,
+ ContainerExitStatus.DISKS_FAILED
+ )
}
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
index ceac7cda5f8be..268976b629507 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -120,7 +120,9 @@ private[spark] class YarnAllocatorBlacklistTracker(
if (removals.nonEmpty) {
logInfo(s"removing nodes from YARN application master's blacklist:
$removals")
}
- amClient.updateBlacklist(additions.asJava, removals.asJava)
+ if (additions.nonEmpty || removals.nonEmpty) {
+ amClient.updateBlacklist(additions.asJava, removals.asJava)
+ }
currentBlacklistedYarnNodes = nodesToBlacklist
}
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
index aeac68e6ed330..201910731e934 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
@@ -87,7 +87,7 @@ class YarnAllocatorBlacklistTrackerSuite extends
SparkFunSuite with Matchers
// expired blacklisted nodes (simulating a resource request)
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
// no change is communicated to YARN regarding the blacklisting
- verify(amClientMock).updateBlacklist(Collections.emptyList(),
Collections.emptyList())
+ verify(amClientMock, times(0)).updateBlacklist(Collections.emptyList(),
Collections.emptyList())
}
test("combining scheduler and allocation blacklist") {
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 3f783baed110d..2fb892ecbc33a 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.deploy.yarn
+import java.util.Collections
+
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
@@ -24,6 +26,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.mockito.ArgumentCaptor
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}
@@ -86,7 +89,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers
with BeforeAndAfter
def createAllocator(
maxExecutors: Int = 5,
- rmClient: AMRMClient[ContainerRequest] = rmClient): YarnAllocator = {
+ rmClient: AMRMClient[ContainerRequest] = rmClient,
+ additionalConfigs: Map[String, String] = Map()): YarnAllocator = {
val args = Array(
"--jar", "somejar.jar",
"--class", "SomeClass")
@@ -95,6 +99,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers
with BeforeAndAfter
.set("spark.executor.instances", maxExecutors.toString)
.set("spark.executor.cores", "5")
.set("spark.executor.memory", "2048")
+
+ for ((name, value) <- additionalConfigs) {
+ sparkConfClone.set(name, value)
+ }
+
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
@@ -108,14 +117,29 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
clock)
}
- def createContainer(host: String): Container = {
- // When YARN 2.6+ is required, avoid deprecation by using version with
long second arg
- val containerId = ContainerId.newInstance(appAttemptId, containerNum)
+ def createContainer(
+ host: String,
+ containerNumber: Int = containerNum,
+ resource: Resource = containerResource): Container = {
+ val containerId: ContainerId = ContainerId.newContainerId(appAttemptId,
containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
Container.newInstance(containerId, nodeId, "", containerResource,
RM_REQUEST_PRIORITY, null)
}
+ def createContainers(hosts: Seq[String], containerIds: Seq[Int]):
Seq[Container] = {
+ hosts.zip(containerIds).map{case (host, id) => createContainer(host, id)}
+ }
+
+ def createContainerStatus(
+ containerId: ContainerId,
+ exitStatus: Int,
+ containerState: ContainerState = ContainerState.COMPLETE,
+ diagnostics: String = "diagnostics"): ContainerStatus = {
+ ContainerStatus.newInstance(containerId, containerState, diagnostics,
exitStatus)
+ }
+
+
test("single container allocated") {
// request a single container and receive it
val handler = createAllocator(1)
@@ -400,4 +424,55 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
clock.advance(50 * 1000L)
handler.getNumExecutorsFailed should be (0)
}
+
+ test("SPARK-26269: YarnAllocator should have same blacklist behaviour with
YARN") {
+ val rmClientSpy = spy(rmClient)
+ val maxExecutors = 11
+
+ val handler = createAllocator(
+ maxExecutors,
+ rmClientSpy,
+ Map(
+ "spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> "true",
+ "spark.blacklist.application.maxFailedExecutorsPerNode" -> "0"))
+ handler.updateResourceRequests()
+
+ val hosts = (0 until maxExecutors).map(i => s"host$i")
+ val ids = 0 to maxExecutors
+ val containers = createContainers(hosts, ids)
+
+ val nonBlacklistedStatuses = Seq(
+ ContainerExitStatus.SUCCESS,
+ ContainerExitStatus.PREEMPTED,
+ ContainerExitStatus.KILLED_EXCEEDED_VMEM,
+ ContainerExitStatus.KILLED_EXCEEDED_PMEM,
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+ ContainerExitStatus.KILLED_BY_APPMASTER,
+ ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+ ContainerExitStatus.ABORTED,
+ ContainerExitStatus.DISKS_FAILED)
+
+ val nonBlacklistedContainerStatuses =
nonBlacklistedStatuses.zipWithIndex.map {
+ case (exitStatus, idx) => createContainerStatus(containers(idx).getId,
exitStatus)
+ }
+
+ val BLACKLISTED_EXIT_CODE = 1
+ val blacklistedStatuses = Seq(ContainerExitStatus.INVALID,
BLACKLISTED_EXIT_CODE)
+
+ val blacklistedContainerStatuses = blacklistedStatuses.zip(9 until
maxExecutors).map {
+ case (exitStatus, idx) => createContainerStatus(containers(idx).getId,
exitStatus)
+ }
+
+ handler.handleAllocatedContainers(containers.slice(0, 9))
+ handler.processCompletedContainers(nonBlacklistedContainerStatuses)
+ verify(rmClientSpy, never())
+ .updateBlacklist(hosts.slice(0, 9).asJava, Collections.emptyList())
+
+ handler.handleAllocatedContainers(containers.slice(9, 11))
+ handler.processCompletedContainers(blacklistedContainerStatuses)
+ verify(rmClientSpy)
+ .updateBlacklist(hosts.slice(9, 10).asJava, Collections.emptyList())
+ verify(rmClientSpy)
+ .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]