hvanhovell commented on code in PR #41701:
URL: https://github.com/apache/spark/pull/41701#discussion_r1243019808


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala:
##########
@@ -200,48 +225,80 @@ class SparkConnectArtifactManager private[connect] {
   }
 }
 
-object SparkConnectArtifactManager {
+object SparkConnectArtifactManager extends Logging {
 
   val forwardToFSPrefix = "forward_to_fs"
 
-  private var _activeArtifactManager: SparkConnectArtifactManager = _
+  private var currentArtifactRootUri: String = _
+  private var lastKnownSparkContextInstance: SparkContext = _
 
-  /**
-   * Obtain the active artifact manager or create a new artifact manager.
-   *
-   * @return
-   */
-  def getOrCreateArtifactManager: SparkConnectArtifactManager = {
-    if (_activeArtifactManager == null) {
-      _activeArtifactManager = new SparkConnectArtifactManager
-    }
-    _activeArtifactManager
+  private val ARTIFACT_DIRECTORY_PREFIX = "artifacts"
+
+  // The base directory where all artifacts are stored.
+  private[spark] lazy val artifactRootPath = {
+    Utils.createTempDir(ARTIFACT_DIRECTORY_PREFIX).toPath
   }
 
-  private lazy val artifactManager = getOrCreateArtifactManager
+  private[spark] def getArtifactDirectoryAndUriForSession(session: 
SparkSession): (Path, String) =
+    (
+      ArtifactUtils.concatenatePaths(artifactRootPath, session.sessionUUID),
+      s"$artifactRootURI/${session.sessionUUID}")
+
+  private[spark] def getArtifactDirectoryAndUriForSession(
+      sessionHolder: SessionHolder): (Path, String) =
+    getArtifactDirectoryAndUriForSession(sessionHolder.session)
+
+  private[spark] def getClassfileDirectoryAndUriForSession(
+      session: SparkSession): (Path, String) = {
+    val (artDir, artUri) = getArtifactDirectoryAndUriForSession(session)
+    (ArtifactUtils.concatenatePaths(artDir, "classes"), s"$artUri/classes/")
+  }
+
+  private[spark] def getClassfileDirectoryAndUriForSession(
+      sessionHolder: SessionHolder): (Path, String) =
+    getClassfileDirectoryAndUriForSession(sessionHolder.session)
 
   /**
-   * Obtain a classloader that contains jar and classfile artifacts on the 
classpath.
+   * Updates the URI for the artifact directory.
    *
-   * @return
+   * This is required if the SparkContext is restarted.
+   *
+   * Note: This logic is solely to handle testing where a [[SparkContext]] may 
be restarted
+   * several times in a single JVM lifetime. In a general Spark cluster, the 
[[SparkContext]] is
+   * not expected to be restarted at any point in time.
    */
-  def classLoaderWithArtifacts: ClassLoader = {
-    val urls = artifactManager.getSparkConnectAddedJars :+
-      artifactManager.classArtifactDir.toUri.toURL
-    new URLClassLoader(urls.toArray, Utils.getContextOrSparkClassLoader)
+  private def refreshArtifactUri(sc: SparkContext): Unit = synchronized {
+    // If a competing thread had updated the URI, we do not need to refresh 
the URI again.
+    if (sc eq lastKnownSparkContextInstance) {
+      return
+    }
+    val oldArtifactUri = currentArtifactRootUri
+    currentArtifactRootUri = SparkEnv.get.rpcEnv.fileServer
+      .addDirectoryIfAbsent(ARTIFACT_DIRECTORY_PREFIX, artifactRootPath.toFile)

Review Comment:
   Can we use `addDirectory` instead? The if-absent bit if pretty well 
protected by this object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to