hvanhovell commented on code in PR #46726:
URL: https://github.com/apache/spark/pull/46726#discussion_r1616453140


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/SessionCleaner.scala:
##########
@@ -17,130 +17,33 @@
 
 package org.apache.spark.sql.internal
 
-import java.lang.ref.{ReferenceQueue, WeakReference}
-import java.util.Collections
-import java.util.concurrent.ConcurrentHashMap
+import java.lang.ref.Cleaner
 
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
-/**
- * Classes that represent cleaning tasks.
- */
-private sealed trait CleanupTask
-private case class CleanupCachedRemoteRelation(dfID: String) extends 
CleanupTask
-
-/**
- * A WeakReference associated with a CleanupTask.
- *
- * When the referent object becomes only weakly reachable, the corresponding
- * CleanupTaskWeakReference is automatically added to the given reference 
queue.
- */
-private class CleanupTaskWeakReference(
-    val task: CleanupTask,
-    referent: AnyRef,
-    referenceQueue: ReferenceQueue[AnyRef])
-    extends WeakReference(referent, referenceQueue)
-
-/**
- * An asynchronous cleaner for objects.
- *
- * This maintains a weak reference for each CashRemoteRelation, etc. of 
interest, to be processed
- * when the associated object goes out of scope of the application. Actual 
cleanup is performed in
- * a separate daemon thread.
- */
 private[sql] class SessionCleaner(session: SparkSession) extends Logging {
-
-  /**
-   * How often (seconds) to trigger a garbage collection in this JVM. This 
context cleaner
-   * triggers cleanups only when weak references are garbage collected. In 
long-running
-   * applications with large driver JVMs, where there is little memory 
pressure on the driver,
-   * this may happen very occasionally or not at all. Not cleaning at all may 
lead to executors
-   * running out of disk space after a while.
-   */
-  private val refQueuePollTimeout: Long = 100
-
-  /**
-   * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage 
collected as long as they
-   * have not been handled by the reference queue.
-   */
-  private val referenceBuffer =
-    Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
-
-  private val referenceQueue = new ReferenceQueue[AnyRef]
-
-  private val cleaningThread = new Thread() { override def run(): Unit = 
keepCleaning() }
-
-  @volatile private var started = false
-  @volatile private var stopped = false
-
-  /** Start the cleaner. */
-  def start(): Unit = {
-    cleaningThread.setDaemon(true)
-    cleaningThread.setName("Spark Connect Context Cleaner")
-    cleaningThread.start()
-  }
-
-  /**
-   * Stop the cleaning thread and wait until the thread has finished running 
its current task.
-   */
-  def stop(): Unit = {
-    stopped = true
-    // Interrupt the cleaning thread, but wait until the current task has 
finished before
-    // doing so. This guards against the race condition where a cleaning 
thread may
-    // potentially clean similarly named variables created by a different 
SparkSession.
-    synchronized {
-      cleaningThread.interrupt()
-    }
-    cleaningThread.join()
-  }
+  private val cleaner = Cleaner.create()

Review Comment:
   We can do this in a follow-up? @LuciferYang is there any concrete concern 
here? Or are you just being tidy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to