maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r529226242



##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
           case "file" => addLocalJarFile(new File(uri.getPath))
           // A JAR file which exists locally on every worker node
           case "local" => "file:" + uri.getPath
+          case "ivy" =>
+            // Since `new Path(path).toUri` will lose query information,
+            // so here we use `URI>create(path)`

Review comment:
       nit: `>` -> `.`?

##########
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##########
@@ -955,6 +955,20 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
         .set(EXECUTOR_ALLOW_SPARK_CONTEXT, true)).stop()
     }
   }
+
+  test("SPARK-33084: Add jar support ivy url") {
+    sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+    sc.addJar("ivy://org.scala-js:scalajs-test-interface_2.12:1.2.0")
+    
assert(sc.listJars().find(_.contains("scalajs-test-interface_2.12")).nonEmpty)

Review comment:
       nit: `find` -> `exists`

##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
           case "file" => addLocalJarFile(new File(uri.getPath))
           // A JAR file which exists locally on every worker node
           case "local" => "file:" + uri.getPath
+          case "ivy" =>
+            // Since `new Path(path).toUri` will lose query information,
+            // so here we use `URI>create(path)`
+            Utils.resolveMavenDependencies(URI.create(path))
           case _ => checkRemoteJarFile(path)
         }
       }
-      if (key != null) {
+      if (keys != null) {
         val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-        if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-          logInfo(s"Added JAR $path at $key with timestamp $timestamp")
-          postEnvironmentUpdate()
-        } else {
-          logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
-            "is not supported in the current version.")
+        keys.split(",").foreach { key =>

Review comment:
       Why do we need to split it by `,` here? It seems this PR adds no test 
for this case though.

##########
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##########
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
 ADD JAR "/path/to/some.jar";
 ADD JAR '/some/other.jar';
 ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";

Review comment:
       `exclusin` -> `exclusion`

##########
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##########
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
 ADD JAR "/path/to/some.jar";
 ADD JAR '/some/other.jar';
 ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";
+ADD JAR 
"ivy://group:module:version?exclusin=group:module,group:module&transitive=false";

Review comment:
       Could you move the description of the syntaxes into the section 
`**file_name**` above? And, please put some **concrete** examples in this 
example section.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
##########
@@ -159,6 +161,17 @@ class SessionResourceLoader(session: SparkSession) extends 
FunctionResourceLoade
     }
   }
 
+  def resolveJars(path: String): List[String] = {
+    val uri = new Path(path).toUri
+    uri.getScheme match {
+      case "ivy" =>
+        Utils.resolveMavenDependencies(URI.create(path))
+          .split(",").toList
+      case _ =>
+        path :: Nil

Review comment:
       nit format:
   ```
     def resolveJars(path: String): List[String] = {
       new Path(path).toUri.getScheme match {
         case "ivy" => 
Utils.resolveMavenDependencies(URI.create(path)).split(",").toList
         case _ => path :: Nil
       }
     }
   ```

##########
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##########
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
 ADD JAR "/path/to/some.jar";
 ADD JAR '/some/other.jar';
 ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";
+ADD JAR 
"ivy://group:module:version?exclusin=group:module,group:module&transitive=false";

Review comment:
       Also, I think we need an explanation about each param, e.g., `exclusion` 
and `transitive`.

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2980,6 +2980,75 @@ private[spark] object Utils extends Logging {
     metadata.toString
   }
 
+  /**
+   * Download Ivy URIs dependent jars.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+    val Seq(repositories, ivyRepoPath, ivySettingsPath) =
+      Seq(
+        "spark.jars.repositories",
+        "spark.jars.ivy",
+        "spark.jars.ivySettings"
+      ).map(sys.props.get(_).orNull)
+    // Create the IvySettings, either load from file or build defaults
+    val ivySettings = Option(ivySettingsPath) match {
+      case Some(path) =>
+        SparkSubmitUtils.loadIvySettings(path, Option(repositories), 
Option(ivyRepoPath))
+
+      case None =>
+        SparkSubmitUtils.buildIvySettings(Option(repositories), 
Option(ivyRepoPath))
+    }
+    SparkSubmitUtils.resolveMavenCoordinates(uri.getAuthority, ivySettings,
+      parseExcludeList(uri.getQuery), parseTransitive(uri.getQuery))
+  }
+
+  /**
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   *         Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *         Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(queryString: String): Array[String] = {
+    if (queryString == null || queryString.isEmpty) {
+      Array.empty[String]
+    } else {
+      val mapTokens: Array[String] = queryString.split("&")
+      assert(mapTokens.forall(_.split("=").length == 2), "Invalid query 
string: " + queryString)
+      mapTokens.map(_.split("=")).map(kv => (kv(0), kv(1))).filter(_._1 == 
"exclude")
+        .flatMap { case (_, excludeString) =>
+          val excludes: Array[String] = excludeString.split(",")
+          assert(excludes.forall(_.split(":").length == 2),
+            "Invalid exclude string: expected 'org:module,org:module,..', 
found " + excludeString)
+          excludes
+        }
+    }
+  }
+
+  /**
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   *         Example: Input:  exclude=org.mortbay.jetty:jetty&transitive=true
+   *         Output:  true
+   */
+  private def parseTransitive(queryString: String): Boolean = {
+    if (queryString == null || queryString.isEmpty) {
+      false
+    } else {
+      val mapTokens: Array[String] = queryString.split("&")
+      assert(mapTokens.forall(_.split("=").length == 2), "Invalid query 
string: " + queryString)
+      val transitive = mapTokens.map(_.split("=")).map(kv => (kv(0), kv(1)))
+        .filter(_._1 == "transitive")

Review comment:
       Some code to parse params look duplicated, so could you share them 
between `parseTransitive` and `parseExcludeList `?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
##########
@@ -159,6 +161,17 @@ class SessionResourceLoader(session: SparkSession) extends 
FunctionResourceLoade
     }
   }
 
+  def resolveJars(path: String): List[String] = {

Review comment:
       protected

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2980,6 +2980,75 @@ private[spark] object Utils extends Logging {
     metadata.toString
   }
 
+  /**
+   * Download Ivy URIs dependent jars.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+    val Seq(repositories, ivyRepoPath, ivySettingsPath) =
+      Seq(
+        "spark.jars.repositories",
+        "spark.jars.ivy",
+        "spark.jars.ivySettings"
+      ).map(sys.props.get(_).orNull)
+    // Create the IvySettings, either load from file or build defaults
+    val ivySettings = Option(ivySettingsPath) match {
+      case Some(path) =>
+        SparkSubmitUtils.loadIvySettings(path, Option(repositories), 
Option(ivyRepoPath))
+
+      case None =>
+        SparkSubmitUtils.buildIvySettings(Option(repositories), 
Option(ivyRepoPath))
+    }
+    SparkSubmitUtils.resolveMavenCoordinates(uri.getAuthority, ivySettings,
+      parseExcludeList(uri.getQuery), parseTransitive(uri.getQuery))
+  }
+
+  /**
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   *         Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *         Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(queryString: String): Array[String] = {
+    if (queryString == null || queryString.isEmpty) {
+      Array.empty[String]
+    } else {
+      val mapTokens: Array[String] = queryString.split("&")

Review comment:
       nit: we don't need the type `: Array[String]`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to