Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/7786#discussion_r40031705
--- Diff:
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -238,6 +257,33 @@ private[yarn] class YarnAllocator(
}
}
+ /** Get a list of preemption containers through AM-RM heartbeat. */
+ private def preemptResources(allocateResponse: AllocateResponse): Unit =
{
+ val preemptMessageOpt = Option(allocateResponse.getPreemptionMessage)
+ val strictContractOpt = preemptMessageOpt.map(p =>
Option(p.getStrictContract)).flatten
+ val contractOpt = preemptMessageOpt.map(p =>
Option(p.getContract)).flatten
+
+ // We need to forget all the already preempted executors at this time,
+ // because preemption information will be changed time to time.
+ preemptionExecutors.clear()
--- End diff --
This is not thread-safe. You can be going through the list in the
`ApplicationMaster` when this code runs, and that might cause exceptions.
Instead, I'd recommend returning the set of preempted executors from
`updateResources`, always creating a new set when this method is called. Then
you also don't need the class field nor the synchronized getter.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]