heyihong commented on code in PR #52073: URL: https://github.com/apache/spark/pull/52073#discussion_r2299204538
########## sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala: ########## @@ -266,28 +267,41 @@ class ArtifactManager(session: SparkSession) extends AutoCloseable with Logging * they are from a permanent location. */ private[sql] def addLocalArtifacts(artifacts: Seq[Artifact]): Unit = { + val failedArtifactExceptions = ListBuffer[SparkRuntimeException]() + artifacts.foreach { artifact => - artifact.storage match { - case d: Artifact.LocalFile => - addArtifact( - artifact.path, - d.path, - fragment = None, - deleteStagedFile = false) - case d: Artifact.InMemory => - val tempDir = Utils.createTempDir().toPath - val tempFile = tempDir.resolve(artifact.path.getFileName) - val outStream = Files.newOutputStream(tempFile) - Utils.tryWithSafeFinallyAndFailureCallbacks { - d.stream.transferTo(outStream) - addArtifact(artifact.path, tempFile, fragment = None) - }(finallyBlock = { - outStream.close() - }) - case _ => - throw SparkException.internalError(s"Unsupported artifact storage: ${artifact.storage}") + try { + artifact.storage match { + case d: Artifact.LocalFile => + addArtifact( + artifact.path, + d.path, + fragment = None, + deleteStagedFile = false) + case d: Artifact.InMemory => + val tempDir = Utils.createTempDir().toPath + val tempFile = tempDir.resolve(artifact.path.getFileName) + val outStream = Files.newOutputStream(tempFile) + Utils.tryWithSafeFinallyAndFailureCallbacks { + d.stream.transferTo(outStream) + addArtifact(artifact.path, tempFile, fragment = None) + }(finallyBlock = { + outStream.close() + }) + case _ => + throw SparkException.internalError(s"Unsupported artifact storage: ${artifact.storage}") + } + } catch { + case e: SparkRuntimeException => + failedArtifactExceptions += e } } + + if (failedArtifactExceptions.nonEmpty) { Review Comment: The error handling and suppression logic seems to be duplicated in both ArtifactManager.scala and SparkConnectAddArtifactsHandler.scala. ```scala if (failedArtifactExceptions.nonEmpty) { val exception = failedArtifactExceptions.head failedArtifactExceptions.drop(1).foreach(exception.addSuppressed(_)) throw exception } } ``` I was wondering whether it makes sense to introduce a small utility to handle Seq[Try[...]] instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org