hvanhovell commented on code in PR #53914:
URL: https://github.com/apache/spark/pull/53914#discussion_r2730977380
##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -628,15 +677,37 @@ private[spark] class Executor(
override def run(): Unit = {
// Classloader isolation
+ // We need to retry the cache lookup if the session was evicted between
get() and acquire().
+ // This can happen when the cache is full and another task triggers
eviction.
val isolatedSession = taskDescription.artifacts.state match {
case Some(jobArtifactState) =>
- isolatedSessionCache.get(jobArtifactState.uuid, () =>
newSessionState(jobArtifactState))
- case _ => defaultSessionState
+ var session: IsolatedSessionState = null
+ var acquired = false
+ while (!acquired) {
+ // Get or create session. The loader uses sessions map as the
authoritative store.
+ // This ensures there's only one IsolatedSessionState per UUID at
any time.
+ session = isolatedSessionCache.get(jobArtifactState.uuid, () => {
+ // Check the authoritative sessions map first. tryUnEvict() will
block if
+ // cleanup is in progress, so when it returns false, the session
is already
+ // removed from the map and it's safe to create a new one.
+ val existingSession =
IsolatedSessionState.sessions.get(jobArtifactState.uuid)
+ if (existingSession != null && existingSession.tryUnEvict()) {
Review Comment:
When we unevict we are not readding it to the cache right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]