maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r529226242
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
case "file" => addLocalJarFile(new File(uri.getPath))
// A JAR file which exists locally on every worker node
case "local" => "file:" + uri.getPath
+ case "ivy" =>
+ // Since `new Path(path).toUri` will lose query information,
+ // so here we use `URI>create(path)`
Review comment:
nit: `>` -> `.`?
##########
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##########
@@ -955,6 +955,20 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
.set(EXECUTOR_ALLOW_SPARK_CONTEXT, true)).stop()
}
}
+
+ test("SPARK-33084: Add jar support ivy url") {
+ sc = new SparkContext(new
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+ sc.addJar("ivy://org.scala-js:scalajs-test-interface_2.12:1.2.0")
+
assert(sc.listJars().find(_.contains("scalajs-test-interface_2.12")).nonEmpty)
Review comment:
nit: `find` -> `exists`
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
case "file" => addLocalJarFile(new File(uri.getPath))
// A JAR file which exists locally on every worker node
case "local" => "file:" + uri.getPath
+ case "ivy" =>
+ // Since `new Path(path).toUri` will lose query information,
+ // so here we use `URI>create(path)`
+ Utils.resolveMavenDependencies(URI.create(path))
case _ => checkRemoteJarFile(path)
}
}
- if (key != null) {
+ if (keys != null) {
val timestamp = if (addedOnSubmit) startTime else
System.currentTimeMillis
- if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
- logInfo(s"Added JAR $path at $key with timestamp $timestamp")
- postEnvironmentUpdate()
- } else {
- logWarning(s"The jar $path has been added already. Overwriting of
added jars " +
- "is not supported in the current version.")
+ keys.split(",").foreach { key =>
Review comment:
Why do we need to split it by `,` here? It seems this PR adds no test
for this case though.
##########
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##########
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
ADD JAR "/path/to/some.jar";
ADD JAR '/some/other.jar';
ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";
Review comment:
`exclusin` -> `exclusion`
##########
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##########
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
ADD JAR "/path/to/some.jar";
ADD JAR '/some/other.jar';
ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";
+ADD JAR
"ivy://group:module:version?exclusin=group:module,group:module&transitive=false";
Review comment:
Could you move the description of the syntaxes into the section
`**file_name**` above? And, please put some **concrete** examples in this
example section.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
##########
@@ -159,6 +161,17 @@ class SessionResourceLoader(session: SparkSession) extends
FunctionResourceLoade
}
}
+ def resolveJars(path: String): List[String] = {
+ val uri = new Path(path).toUri
+ uri.getScheme match {
+ case "ivy" =>
+ Utils.resolveMavenDependencies(URI.create(path))
+ .split(",").toList
+ case _ =>
+ path :: Nil
Review comment:
nit format:
```
def resolveJars(path: String): List[String] = {
new Path(path).toUri.getScheme match {
case "ivy" =>
Utils.resolveMavenDependencies(URI.create(path)).split(",").toList
case _ => path :: Nil
}
}
```
##########
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##########
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
ADD JAR "/path/to/some.jar";
ADD JAR '/some/other.jar';
ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";
+ADD JAR
"ivy://group:module:version?exclusin=group:module,group:module&transitive=false";
Review comment:
Also, I think we need an explanation about each param, e.g., `exclusion`
and `transitive`.
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2980,6 +2980,75 @@ private[spark] object Utils extends Logging {
metadata.toString
}
+ /**
+ * Download Ivy URIs dependent jars.
+ *
+ * @param uri Ivy uri need to be downloaded.
+ * @return Comma separated string list of URIs of downloaded jars
+ */
+ def resolveMavenDependencies(uri: URI): String = {
+ val Seq(repositories, ivyRepoPath, ivySettingsPath) =
+ Seq(
+ "spark.jars.repositories",
+ "spark.jars.ivy",
+ "spark.jars.ivySettings"
+ ).map(sys.props.get(_).orNull)
+ // Create the IvySettings, either load from file or build defaults
+ val ivySettings = Option(ivySettingsPath) match {
+ case Some(path) =>
+ SparkSubmitUtils.loadIvySettings(path, Option(repositories),
Option(ivyRepoPath))
+
+ case None =>
+ SparkSubmitUtils.buildIvySettings(Option(repositories),
Option(ivyRepoPath))
+ }
+ SparkSubmitUtils.resolveMavenCoordinates(uri.getAuthority, ivySettings,
+ parseExcludeList(uri.getQuery), parseTransitive(uri.getQuery))
+ }
+
+ /**
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of exclude.
+ * Example: Input:
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+ * Output: [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+ */
+ private def parseExcludeList(queryString: String): Array[String] = {
+ if (queryString == null || queryString.isEmpty) {
+ Array.empty[String]
+ } else {
+ val mapTokens: Array[String] = queryString.split("&")
+ assert(mapTokens.forall(_.split("=").length == 2), "Invalid query
string: " + queryString)
+ mapTokens.map(_.split("=")).map(kv => (kv(0), kv(1))).filter(_._1 ==
"exclude")
+ .flatMap { case (_, excludeString) =>
+ val excludes: Array[String] = excludeString.split(",")
+ assert(excludes.forall(_.split(":").length == 2),
+ "Invalid exclude string: expected 'org:module,org:module,..',
found " + excludeString)
+ excludes
+ }
+ }
+ }
+
+ /**
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of transitive.
+ * Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true
+ * Output: true
+ */
+ private def parseTransitive(queryString: String): Boolean = {
+ if (queryString == null || queryString.isEmpty) {
+ false
+ } else {
+ val mapTokens: Array[String] = queryString.split("&")
+ assert(mapTokens.forall(_.split("=").length == 2), "Invalid query
string: " + queryString)
+ val transitive = mapTokens.map(_.split("=")).map(kv => (kv(0), kv(1)))
+ .filter(_._1 == "transitive")
Review comment:
Some code to parse params look duplicated, so could you share them
between `parseTransitive` and `parseExcludeList `?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
##########
@@ -159,6 +161,17 @@ class SessionResourceLoader(session: SparkSession) extends
FunctionResourceLoade
}
}
+ def resolveJars(path: String): List[String] = {
Review comment:
protected
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2980,6 +2980,75 @@ private[spark] object Utils extends Logging {
metadata.toString
}
+ /**
+ * Download Ivy URIs dependent jars.
+ *
+ * @param uri Ivy uri need to be downloaded.
+ * @return Comma separated string list of URIs of downloaded jars
+ */
+ def resolveMavenDependencies(uri: URI): String = {
+ val Seq(repositories, ivyRepoPath, ivySettingsPath) =
+ Seq(
+ "spark.jars.repositories",
+ "spark.jars.ivy",
+ "spark.jars.ivySettings"
+ ).map(sys.props.get(_).orNull)
+ // Create the IvySettings, either load from file or build defaults
+ val ivySettings = Option(ivySettingsPath) match {
+ case Some(path) =>
+ SparkSubmitUtils.loadIvySettings(path, Option(repositories),
Option(ivyRepoPath))
+
+ case None =>
+ SparkSubmitUtils.buildIvySettings(Option(repositories),
Option(ivyRepoPath))
+ }
+ SparkSubmitUtils.resolveMavenCoordinates(uri.getAuthority, ivySettings,
+ parseExcludeList(uri.getQuery), parseTransitive(uri.getQuery))
+ }
+
+ /**
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of exclude.
+ * Example: Input:
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+ * Output: [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+ */
+ private def parseExcludeList(queryString: String): Array[String] = {
+ if (queryString == null || queryString.isEmpty) {
+ Array.empty[String]
+ } else {
+ val mapTokens: Array[String] = queryString.split("&")
Review comment:
nit: we don't need the type `: Array[String]`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]