hvanhovell commented on code in PR #41701:
URL: https://github.com/apache/spark/pull/41701#discussion_r1243033235
##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala:
##########
@@ -155,10 +169,113 @@ class ArtifactManagerSuite extends SharedSparkSession
with ResourceHelper {
val stagingPath = copyDir.resolve("smallClassFile.class")
val remotePath = Paths.get("forward_to_fs", destFSDir.toString,
"smallClassFileCopied.class")
assert(stagingPath.toFile.exists())
- artifactManager.uploadArtifactToFs(sessionHolder, remotePath, stagingPath)
- artifactManager.addArtifact(sessionHolder, remotePath, stagingPath, None)
+ artifactManager.uploadArtifactToFs(remotePath, stagingPath)
+ artifactManager.addArtifact(remotePath, stagingPath, None)
val copiedClassFile = Paths.get(destFSDir.toString,
"smallClassFileCopied.class").toFile
assert(copiedClassFile.exists())
}
+
+ test("Removal of resources") {
+ withTempPath { path =>
+ // Setup cache
+ val stagingPath = path.toPath
+ Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8))
+ val remotePath = Paths.get("cache/abc")
+ val session = sessionHolder()
+ val blockManager = spark.sparkContext.env.blockManager
+ val blockId = CacheId(session.userId, session.sessionId, "abc")
+ // Setup artifact dir
+ val copyDir = Utils.createTempDir().toPath
+ FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
+ try {
+ artifactManager.addArtifact(remotePath, stagingPath, None)
+ val stagingPathFile = copyDir.resolve("smallClassFile.class")
+ val remotePathFile = Paths.get("classes/smallClassFile.class")
+ artifactManager.addArtifact(remotePathFile, stagingPathFile, None)
+
+ // Verify resources exist
+ val bytes = blockManager.getLocalBytes(blockId)
+ assert(bytes.isDefined)
+ blockManager.releaseLock(blockId)
+ val expectedPath = SparkConnectArtifactManager.artifactRootPath
+ .resolve(s"$sessionUUID/classes/smallClassFile.class")
+ assert(expectedPath.toFile.exists())
+
+ // Remove resources
+ artifactManager.cleanUpResources()
+
+ assert(!blockManager.getLocalBytes(blockId).isDefined)
+ assert(!expectedPath.toFile.exists())
+ } finally {
+ try {
+ blockManager.releaseLock(blockId)
+ } catch {
+ case _: SparkException =>
+ case throwable: Throwable => throw throwable
+ } finally {
+ FileUtils.deleteDirectory(copyDir.toFile)
+ blockManager.removeCache(session.userId, session.sessionId)
+ }
+ }
+ }
+ }
+
+ test("Classloaders for spark sessions are isolated") {
+ val holder1 = SparkConnectService.getOrCreateIsolatedSession("c1",
"session1")
+ val holder2 = SparkConnectService.getOrCreateIsolatedSession("c2",
"session2")
+
+ def addHelloClass(holder: SessionHolder): Unit = {
+ val copyDir = Utils.createTempDir().toPath
+ FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
+ val stagingPath = copyDir.resolve("Hello.class")
+ val remotePath = Paths.get("classes/Hello.class")
+ assert(stagingPath.toFile.exists())
+ holder.addArtifact(remotePath, stagingPath, None)
+ }
+
+ // Add the classfile only for the first user
+ addHelloClass(holder1)
+
+ val classLoader1 = holder1.classloader
+ val instance1 = classLoader1
Review Comment:
You could add another session here where you can load Hello. In that cases
the classes for the different sessions should not be equal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]