This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 89d44d092af [SPARK-43510][YARN] Fix YarnAllocator internal state when adding running executor after processing completed containers 89d44d092af is described below commit 89d44d092af4ae53fec296ca6569e240ad4c2bc5 Author: manuzhang <owenzhang1...@gmail.com> AuthorDate: Tue Jun 6 08:28:52 2023 -0500 [SPARK-43510][YARN] Fix YarnAllocator internal state when adding running executor after processing completed containers ### What changes were proposed in this pull request? Keep track of completed container ids in YarnAllocator and don't update internal state of a container if it's already completed. ### Why are the changes needed? YarnAllocator updates internal state adding running executors after executor launch in a separate thread. That can happen after the containers are already completed (e.g. preempted) and processed by YarnAllocator. Then YarnAllocator mistakenly thinks there are still running executors which are already lost. As a result, application hangs without any running executors. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added UT. Closes #41173 from manuzhang/spark-43510. Authored-by: manuzhang <owenzhang1...@gmail.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../apache/spark/deploy/yarn/YarnAllocator.scala | 42 ++++++++++++++-------- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 28 ++++++++++++++- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index b6ee21ed817..19c06f95731 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -91,6 +91,9 @@ private[yarn] class YarnAllocator( @GuardedBy("this") private val releasedContainers = collection.mutable.HashSet[ContainerId]() + @GuardedBy("this") + private val launchingExecutorContainerIds = collection.mutable.HashSet[ContainerId]() + @GuardedBy("this") private val runningExecutorsPerResourceProfileId = new HashMap[Int, mutable.Set[String]]() @@ -738,19 +741,6 @@ private[yarn] class YarnAllocator( logInfo(s"Launching container $containerId on host $executorHostname " + s"for executor with ID $executorId for ResourceProfile Id $rpId") - def updateInternalState(): Unit = synchronized { - getOrUpdateRunningExecutorForRPId(rpId).add(executorId) - getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() - executorIdToContainer(executorId) = container - containerIdToExecutorIdAndResourceProfileId(container.getId) = (executorId, rpId) - - val localallocatedHostToContainersMap = getOrUpdateAllocatedHostToContainersMapForRPId(rpId) - val containerSet = localallocatedHostToContainersMap.getOrElseUpdate(executorHostname, - new HashSet[ContainerId]) - containerSet += containerId - allocatedContainerToHostMap.put(containerId, executorHostname) - } - val rp = rpIdToResourceProfile(rpId) val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf) val containerMem = rp.executorResources.get(ResourceProfile.MEMORY). @@ -763,6 +753,7 @@ private[yarn] class YarnAllocator( val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) { getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet() + launchingExecutorContainerIds.add(containerId) if (launchContainers) { launcherPool.execute(() => { try { @@ -780,10 +771,11 @@ private[yarn] class YarnAllocator( localResources, rp.id ).run() - updateInternalState() + updateInternalState(rpId, executorId, container) } catch { case e: Throwable => getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() + launchingExecutorContainerIds.remove(containerId) if (NonFatal(e)) { logError(s"Failed to launch executor $executorId on container $containerId", e) // Assigned container should be released immediately @@ -796,7 +788,7 @@ private[yarn] class YarnAllocator( }) } else { // For test only - updateInternalState() + updateInternalState(rpId, executorId, container) } } else { logInfo(("Skip launching executorRunnable as running executors count: %d " + @@ -806,11 +798,31 @@ private[yarn] class YarnAllocator( } } + private def updateInternalState(rpId: Int, executorId: String, + container: Container): Unit = synchronized { + val containerId = container.getId + if (launchingExecutorContainerIds.contains(containerId)) { + getOrUpdateRunningExecutorForRPId(rpId).add(executorId) + executorIdToContainer(executorId) = container + containerIdToExecutorIdAndResourceProfileId(containerId) = (executorId, rpId) + + val localallocatedHostToContainersMap = getOrUpdateAllocatedHostToContainersMapForRPId(rpId) + val executorHostname = container.getNodeId.getHost + val containerSet = localallocatedHostToContainersMap.getOrElseUpdate(executorHostname, + new HashSet[ContainerId]) + containerSet += containerId + allocatedContainerToHostMap.put(containerId, executorHostname) + launchingExecutorContainerIds.remove(containerId) + } + getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() + } + // Visible for testing. private[yarn] def processCompletedContainers( completedContainers: Seq[ContainerStatus]): Unit = synchronized { for (completedContainer <- completedContainers) { val containerId = completedContainer.getContainerId + launchingExecutorContainerIds.remove(containerId) val (_, rpId) = containerIdToExecutorIdAndResourceProfileId.getOrElse(containerId, ("", DEFAULT_RESOURCE_PROFILE_ID)) val alreadyReleased = releasedContainers.remove(containerId) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index ed591fd9e36..055edfbf767 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.util import java.util.Collections +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -33,6 +34,7 @@ import org.mockito.ArgumentCaptor import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer +import org.scalatest.PrivateMethodTester import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ @@ -60,7 +62,7 @@ class MockResolver extends SparkRackResolver(SparkHadoopUtil.get.conf) { } -class YarnAllocatorSuite extends SparkFunSuite with Matchers { +class YarnAllocatorSuite extends SparkFunSuite with Matchers with PrivateMethodTester { val conf = new YarnConfiguration() val sparkConf = new SparkConf() sparkConf.set(DRIVER_HOST_ADDRESS, "localhost") @@ -832,4 +834,28 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers { verify(rpcEndPoint, times(1)). send(DecommissionExecutorsOnHost(org.mockito.ArgumentMatchers.any())) } + + test("SPARK-43510: Running executors should be none when YarnAllocator adds running executors " + + "after processing completed containers") { + val (handler, _) = createAllocator(1) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be(0) + handler.getNumContainersPendingAllocate should be(1) + + val container = createContainer("host1") + handler.handleAllocatedContainers(Array(container)) + handler.getNumExecutorsRunning should be(1) + handler.getNumContainersPendingAllocate should be(0) + + val status = ContainerStatus.newInstance( + container.getId, ContainerState.COMPLETE, "Finished", 0) + val getOrUpdateNumExecutorsStartingForRPId = PrivateMethod[AtomicInteger]( + Symbol("getOrUpdateNumExecutorsStartingForRPId")) + handler.invokePrivate(getOrUpdateNumExecutorsStartingForRPId(0)).incrementAndGet() + handler.processCompletedContainers(Seq(status)) + val updateInternalState = PrivateMethod[Unit](Symbol("updateInternalState")) + handler.invokePrivate(updateInternalState(0, "1", container)) + handler.getNumExecutorsRunning should be(0) + handler.getNumExecutorsStarting should be(0) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org