beliefer commented on code in PR #50020:
URL: https://github.com/apache/spark/pull/50020#discussion_r1984387678
##########
core/src/main/scala/org/apache/spark/BarrierCoordinator.scala:
##########
@@ -132,13 +136,15 @@ private[spark] class BarrierCoordinator(
}
}
- // Cancel the current active TimerTask and release resources.
+ /* Cancel the tasks scheduled to run inside the ScheduledExecutor
Threadpool
+ * The original implementation was clearing java.util.Timer and
java.util.TimerTasks
+ * This became a no-op when java.util.Timer was replaced with
ScheduledThreadPoolExecutor
+ * Each time all the Timertasks are cancelled inside the timer, clear the
corresponding entries
+ * to reduce the memory requirements
+ */
private def cancelTimerTask(): Unit = {
- if (timerTask != null) {
- timerTask.cancel()
- timer.purge()
Review Comment:
`ScheduledThreadPoolExecutor` also is a `ThreadPoolExecutor`, it has the
`purge` too.
```
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// Take slow path if we encounter interference during traversal.
// Make copy for traversal and call remove for cancelled entries.
// The slow path is more likely to be O(N*N).
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate(); // In case SHUTDOWN and now empty
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]