heyihong commented on code in PR #52073:
URL: https://github.com/apache/spark/pull/52073#discussion_r2299204538


##########
sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala:
##########
@@ -266,28 +267,41 @@ class ArtifactManager(session: SparkSession) extends 
AutoCloseable with Logging
    * they are from a permanent location.
    */
   private[sql] def addLocalArtifacts(artifacts: Seq[Artifact]): Unit = {
+    val failedArtifactExceptions = ListBuffer[SparkRuntimeException]()
+
     artifacts.foreach { artifact =>
-      artifact.storage match {
-        case d: Artifact.LocalFile =>
-          addArtifact(
-            artifact.path,
-            d.path,
-            fragment = None,
-            deleteStagedFile = false)
-        case d: Artifact.InMemory =>
-          val tempDir = Utils.createTempDir().toPath
-          val tempFile = tempDir.resolve(artifact.path.getFileName)
-          val outStream = Files.newOutputStream(tempFile)
-          Utils.tryWithSafeFinallyAndFailureCallbacks {
-            d.stream.transferTo(outStream)
-            addArtifact(artifact.path, tempFile, fragment = None)
-          }(finallyBlock = {
-            outStream.close()
-          })
-        case _ =>
-          throw SparkException.internalError(s"Unsupported artifact storage: 
${artifact.storage}")
+      try {
+        artifact.storage match {
+          case d: Artifact.LocalFile =>
+            addArtifact(
+              artifact.path,
+              d.path,
+              fragment = None,
+              deleteStagedFile = false)
+          case d: Artifact.InMemory =>
+            val tempDir = Utils.createTempDir().toPath
+            val tempFile = tempDir.resolve(artifact.path.getFileName)
+            val outStream = Files.newOutputStream(tempFile)
+            Utils.tryWithSafeFinallyAndFailureCallbacks {
+              d.stream.transferTo(outStream)
+              addArtifact(artifact.path, tempFile, fragment = None)
+            }(finallyBlock = {
+              outStream.close()
+            })
+          case _ =>
+            throw SparkException.internalError(s"Unsupported artifact storage: 
${artifact.storage}")
+        }
+      } catch {
+        case e: SparkRuntimeException =>
+          failedArtifactExceptions += e
       }
     }
+
+    if (failedArtifactExceptions.nonEmpty) {

Review Comment:
   The error handling and suppression logic seems to be duplicated in both 
ArtifactManager.scala and SparkConnectAddArtifactsHandler.scala.
   ```scala
    if (failedArtifactExceptions.nonEmpty) {
         val exception = failedArtifactExceptions.head
         failedArtifactExceptions.drop(1).foreach(exception.addSuppressed(_))
         throw exception
       }
   }
   ```
   I was wondering whether it makes sense to introduce a small utility to 
handle Seq[Try[...]] instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to