LuciferYang commented on code in PR #46726:
URL: https://github.com/apache/spark/pull/46726#discussion_r1612670412
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/SessionCleaner.scala:
##########
@@ -17,130 +17,33 @@
package org.apache.spark.sql.internal
-import java.lang.ref.{ReferenceQueue, WeakReference}
-import java.util.Collections
-import java.util.concurrent.ConcurrentHashMap
+import java.lang.ref.Cleaner
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-/**
- * Classes that represent cleaning tasks.
- */
-private sealed trait CleanupTask
-private case class CleanupCachedRemoteRelation(dfID: String) extends
CleanupTask
-
-/**
- * A WeakReference associated with a CleanupTask.
- *
- * When the referent object becomes only weakly reachable, the corresponding
- * CleanupTaskWeakReference is automatically added to the given reference
queue.
- */
-private class CleanupTaskWeakReference(
- val task: CleanupTask,
- referent: AnyRef,
- referenceQueue: ReferenceQueue[AnyRef])
- extends WeakReference(referent, referenceQueue)
-
-/**
- * An asynchronous cleaner for objects.
- *
- * This maintains a weak reference for each CashRemoteRelation, etc. of
interest, to be processed
- * when the associated object goes out of scope of the application. Actual
cleanup is performed in
- * a separate daemon thread.
- */
private[sql] class SessionCleaner(session: SparkSession) extends Logging {
-
- /**
- * How often (seconds) to trigger a garbage collection in this JVM. This
context cleaner
- * triggers cleanups only when weak references are garbage collected. In
long-running
- * applications with large driver JVMs, where there is little memory
pressure on the driver,
- * this may happen very occasionally or not at all. Not cleaning at all may
lead to executors
- * running out of disk space after a while.
- */
- private val refQueuePollTimeout: Long = 100
-
- /**
- * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage
collected as long as they
- * have not been handled by the reference queue.
- */
- private val referenceBuffer =
- Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
-
- private val referenceQueue = new ReferenceQueue[AnyRef]
-
- private val cleaningThread = new Thread() { override def run(): Unit =
keepCleaning() }
-
- @volatile private var started = false
- @volatile private var stopped = false
-
- /** Start the cleaner. */
- def start(): Unit = {
- cleaningThread.setDaemon(true)
- cleaningThread.setName("Spark Connect Context Cleaner")
- cleaningThread.start()
- }
-
- /**
- * Stop the cleaning thread and wait until the thread has finished running
its current task.
- */
- def stop(): Unit = {
- stopped = true
- // Interrupt the cleaning thread, but wait until the current task has
finished before
- // doing so. This guards against the race condition where a cleaning
thread may
- // potentially clean similarly named variables created by a different
SparkSession.
- synchronized {
- cleaningThread.interrupt()
- }
- cleaningThread.join()
- }
+ private val cleaner = Cleaner.create()
Review Comment:
It seems that the lifecycle of `SessionCleaner` is the same as
`SparkSession`, so when the client holds multiple `SparkSession`s, multiple
instances of `java.lang.ref.Cleaner` will be created. If `cleaner` is defined
in the companion object of `SessionCleaner`, it can allow multiple
`SessionCleaner` to share one `java.lang.ref.Cleaner` instance . Can this meet
the requirements?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]