maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533076491
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1860,7 +1860,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
- def addLocalJarFile(file: File): String = {
+ def addLocalJarFile(file: File): Array[String] = {
Review comment:
`Array` -> `Seq`
##########
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##########
@@ -366,6 +366,12 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
}
}
+ test("add jar local path with comma") {
+ sc = new SparkContext(new
SparkConf().setAppName("test").setMaster("local"))
Review comment:
Cold you add tests for other other schemas?
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1869,15 +1869,15 @@ class SparkContext(config: SparkConf) extends Logging {
throw new IllegalArgumentException(
s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
}
- env.rpcEnv.fileServer.addJar(file)
+ Array(env.rpcEnv.fileServer.addJar(file))
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
- null
+ Array.empty
}
}
- def checkRemoteJarFile(path: String): String = {
+ def checkRemoteJarFile(path: String): Array[String] = {
Review comment:
ditto
##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,122 @@
* limitations under the License.
*/
-package org.apache.spark.deploy
+package org.apache.spark.util
import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+ def getIvyProperties(): Seq[String] = {
+ Seq(
+ "spark.jars.excludes",
+ "spark.jars.packages",
+ "spark.jars.repositories",
+ "spark.jars.ivy",
+ "spark.jars.ivySettings"
+ ).map(sys.props.get(_).orNull)
+ }
+
+ /**
+ * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't
download transitive jar
+ * in excluded list.
+ *
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of exclude.
+ * Example: Input:
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+ * Output: [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+ */
+ private def parseExcludeList(excludes: Array[String]): String = {
+ excludes.flatMap { excludeString =>
+ val excludes: Array[String] = excludeString.split(",")
+ if (excludes.exists(_.split(":").length != 2)) {
+ throw new URISyntaxException(excludeString,
+ "Invalid exclude string: expected 'org:module,org:module,..', found
" + excludeString)
+ }
+ excludes
+ }.mkString(":")
+ }
+
+ /**
+ * Parse transitive parameter in ivy URL, default value is false.
+ *
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of transitive.
+ * Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true
+ * Output: true
+ */
+ private def parseTransitive(transitives: Array[String]): Boolean = {
+ if (transitives.isEmpty) {
+ false
+ } else {
+ if (transitives.length > 1) {
+ logWarning("It's best to specify `transitive` parameter in ivy URL
query only once." +
+ " If there are multiple `transitive` parameter, we will select the
last one")
+ }
+ transitives.last.toBoolean
Review comment:
What if `transitive=invalidStr` in hive? Could you add tests?
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1869,15 +1869,15 @@ class SparkContext(config: SparkConf) extends Logging {
throw new IllegalArgumentException(
s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
}
- env.rpcEnv.fileServer.addJar(file)
+ Array(env.rpcEnv.fileServer.addJar(file))
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
- null
+ Array.empty
Review comment:
Then, `Nil`
##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,122 @@
* limitations under the License.
*/
-package org.apache.spark.deploy
+package org.apache.spark.util
import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+ def getIvyProperties(): Seq[String] = {
+ Seq(
+ "spark.jars.excludes",
+ "spark.jars.packages",
+ "spark.jars.repositories",
+ "spark.jars.ivy",
+ "spark.jars.ivySettings"
+ ).map(sys.props.get(_).orNull)
+ }
+
+ /**
+ * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't
download transitive jar
+ * in excluded list.
+ *
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of exclude.
+ * Example: Input:
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+ * Output: [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+ */
+ private def parseExcludeList(excludes: Array[String]): String = {
+ excludes.flatMap { excludeString =>
+ val excludes: Array[String] = excludeString.split(",")
+ if (excludes.exists(_.split(":").length != 2)) {
+ throw new URISyntaxException(excludeString,
+ "Invalid exclude string: expected 'org:module,org:module,..', found
" + excludeString)
+ }
+ excludes
+ }.mkString(":")
+ }
+
+ /**
+ * Parse transitive parameter in ivy URL, default value is false.
+ *
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of transitive.
+ * Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true
+ * Output: true
+ */
+ private def parseTransitive(transitives: Array[String]): Boolean = {
+ if (transitives.isEmpty) {
+ false
+ } else {
+ if (transitives.length > 1) {
+ logWarning("It's best to specify `transitive` parameter in ivy URL
query only once." +
+ " If there are multiple `transitive` parameter, we will select the
last one")
+ }
+ transitives.last.toBoolean
+ }
+ }
+
+ /**
+ * Download Ivy URIs dependency jars.
+ *
+ * @param uri Ivy uri need to be downloaded. The URI format should be:
+ * `ivy://group:module:version[?query]`
+ * Ivy URI query part format should be:
+ * `parameter=value¶meter=value...`
+ * Note that currently ivy URI query part support two parameters:
+ * 1. transitive: whether to download dependent jars related to
your ivy URL.
+ * transitive=false or `transitive=true`, if not set, the
default value is false.
+ * 2. exclude: exclusion list when download ivy URL jar and
dependency jars.
+ * The `exclude` parameter content is a ',' separated
`group:module` pair string :
+ * `exclude=group:module,group:module...`
+ * @return Comma separated string list of URIs of downloaded jars
+ */
+ def resolveMavenDependencies(uri: URI): String = {
+ val Seq(_, _, repositories, ivyRepoPath, ivySettingsPath) =
+ DependencyUtils.getIvyProperties()
+ val authority = uri.getAuthority
+ if (authority == null) {
+ throw new URISyntaxException(
+ authority, "Invalid url: Expected 'org:module:version', found null")
+ }
+ if (authority.split(":").length != 3) {
+ throw new URISyntaxException(
+ authority, "Invalid url: Expected 'org:module:version', found " +
authority)
+ }
+
+ val uriQuery = uri.getQuery
+ val queryParams: Array[(String, String)] = if (uriQuery == null) {
Review comment:
nit: `val queryParams = if (uriQuery == null) {`
##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,122 @@
* limitations under the License.
*/
-package org.apache.spark.deploy
+package org.apache.spark.util
import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+ def getIvyProperties(): Seq[String] = {
+ Seq(
+ "spark.jars.excludes",
+ "spark.jars.packages",
+ "spark.jars.repositories",
+ "spark.jars.ivy",
+ "spark.jars.ivySettings"
+ ).map(sys.props.get(_).orNull)
+ }
+
+ /**
+ * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't
download transitive jar
+ * in excluded list.
+ *
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of exclude.
+ * Example: Input:
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+ * Output: [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+ */
+ private def parseExcludeList(excludes: Array[String]): String = {
Review comment:
Since each parsing logic is pretty simple, could you merge the two funcs
as `parseQueryParams()`?
https://github.com/apache/spark/commit/c137f1578e18b371123f17f60abf075b97a8f21e#diff-3e9f71e7d80c1dc7d02b0edef611de280f219789f0d2b282887f07e999020024R43
##########
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##########
@@ -366,6 +366,12 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
}
}
+ test("add jar local path with comma") {
+ sc = new SparkContext(new
SparkConf().setAppName("test").setMaster("local"))
+ sc.addJar("file://Test,UDTF.jar")
+ assert(!sc.listJars().exists(_.contains("UDTF.jar")))
Review comment:
What does this test mean? Why do you do `
assert(sc.listJars().exists(_.contains("Test,UDTF.jar")))`?
##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,122 @@
* limitations under the License.
*/
-package org.apache.spark.deploy
+package org.apache.spark.util
import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+ def getIvyProperties(): Seq[String] = {
+ Seq(
+ "spark.jars.excludes",
+ "spark.jars.packages",
+ "spark.jars.repositories",
+ "spark.jars.ivy",
+ "spark.jars.ivySettings"
+ ).map(sys.props.get(_).orNull)
+ }
+
+ /**
+ * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't
download transitive jar
+ * in excluded list.
+ *
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of exclude.
+ * Example: Input:
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+ * Output: [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+ */
+ private def parseExcludeList(excludes: Array[String]): String = {
+ excludes.flatMap { excludeString =>
+ val excludes: Array[String] = excludeString.split(",")
+ if (excludes.exists(_.split(":").length != 2)) {
+ throw new URISyntaxException(excludeString,
+ "Invalid exclude string: expected 'org:module,org:module,..', found
" + excludeString)
+ }
+ excludes
+ }.mkString(":")
+ }
+
+ /**
+ * Parse transitive parameter in ivy URL, default value is false.
+ *
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of transitive.
+ * Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true
+ * Output: true
+ */
+ private def parseTransitive(transitives: Array[String]): Boolean = {
+ if (transitives.isEmpty) {
+ false
+ } else {
+ if (transitives.length > 1) {
+ logWarning("It's best to specify `transitive` parameter in ivy URL
query only once." +
+ " If there are multiple `transitive` parameter, we will select the
last one")
+ }
+ transitives.last.toBoolean
+ }
+ }
+
+ /**
+ * Download Ivy URIs dependency jars.
+ *
+ * @param uri Ivy uri need to be downloaded. The URI format should be:
+ * `ivy://group:module:version[?query]`
+ * Ivy URI query part format should be:
+ * `parameter=value¶meter=value...`
+ * Note that currently ivy URI query part support two parameters:
+ * 1. transitive: whether to download dependent jars related to
your ivy URL.
+ * transitive=false or `transitive=true`, if not set, the
default value is false.
+ * 2. exclude: exclusion list when download ivy URL jar and
dependency jars.
+ * The `exclude` parameter content is a ',' separated
`group:module` pair string :
+ * `exclude=group:module,group:module...`
+ * @return Comma separated string list of URIs of downloaded jars
+ */
+ def resolveMavenDependencies(uri: URI): String = {
+ val Seq(_, _, repositories, ivyRepoPath, ivySettingsPath) =
+ DependencyUtils.getIvyProperties()
+ val authority = uri.getAuthority
+ if (authority == null) {
+ throw new URISyntaxException(
+ authority, "Invalid url: Expected 'org:module:version', found null")
+ }
+ if (authority.split(":").length != 3) {
+ throw new URISyntaxException(
+ authority, "Invalid url: Expected 'org:module:version', found " +
authority)
+ }
+
+ val uriQuery = uri.getQuery
+ val queryParams: Array[(String, String)] = if (uriQuery == null) {
+ Array.empty[(String, String)]
+ } else {
+ val mapTokens = uriQuery.split("&").map(_.split("="))
+ if (mapTokens.exists(_.length != 2)) {
+ throw new URISyntaxException(uriQuery, s"Invalid query string:
$uriQuery")
+ }
+ mapTokens.map(kv => (kv(0), kv(1)))
+ }
+
+ resolveMavenDependencies(
+ parseTransitive(queryParams.filter(_._1.equals("transitive")).map(_._2)),
+ parseExcludeList(queryParams.filter(_._1.equals("exclude")).map(_._2)),
Review comment:
hm, but I still thinks O(1) lookup is better by using `Map`:
https://github.com/apache/spark/commit/c137f1578e18b371123f17f60abf075b97a8f21e#diff-3e9f71e7d80c1dc7d02b0edef611de280f219789f0d2b282887f07e999020024R65
##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,122 @@
* limitations under the License.
*/
-package org.apache.spark.deploy
+package org.apache.spark.util
import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+ def getIvyProperties(): Seq[String] = {
+ Seq(
+ "spark.jars.excludes",
+ "spark.jars.packages",
+ "spark.jars.repositories",
+ "spark.jars.ivy",
+ "spark.jars.ivySettings"
+ ).map(sys.props.get(_).orNull)
+ }
+
+ /**
+ * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't
download transitive jar
+ * in excluded list.
+ *
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of exclude.
+ * Example: Input:
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+ * Output: [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+ */
+ private def parseExcludeList(excludes: Array[String]): String = {
+ excludes.flatMap { excludeString =>
+ val excludes: Array[String] = excludeString.split(",")
+ if (excludes.exists(_.split(":").length != 2)) {
+ throw new URISyntaxException(excludeString,
+ "Invalid exclude string: expected 'org:module,org:module,..', found
" + excludeString)
+ }
+ excludes
+ }.mkString(":")
+ }
+
+ /**
+ * Parse transitive parameter in ivy URL, default value is false.
+ *
+ * @param queryString Ivy URI query part string.
+ * @return Exclude list which contains grape parameters of transitive.
+ * Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true
+ * Output: true
+ */
+ private def parseTransitive(transitives: Array[String]): Boolean = {
+ if (transitives.isEmpty) {
+ false
+ } else {
+ if (transitives.length > 1) {
+ logWarning("It's best to specify `transitive` parameter in ivy URL
query only once." +
+ " If there are multiple `transitive` parameter, we will select the
last one")
+ }
+ transitives.last.toBoolean
+ }
+ }
+
+ /**
+ * Download Ivy URIs dependency jars.
+ *
+ * @param uri Ivy uri need to be downloaded. The URI format should be:
+ * `ivy://group:module:version[?query]`
+ * Ivy URI query part format should be:
+ * `parameter=value¶meter=value...`
+ * Note that currently ivy URI query part support two parameters:
+ * 1. transitive: whether to download dependent jars related to
your ivy URL.
+ * transitive=false or `transitive=true`, if not set, the
default value is false.
+ * 2. exclude: exclusion list when download ivy URL jar and
dependency jars.
+ * The `exclude` parameter content is a ',' separated
`group:module` pair string :
+ * `exclude=group:module,group:module...`
+ * @return Comma separated string list of URIs of downloaded jars
+ */
+ def resolveMavenDependencies(uri: URI): String = {
Review comment:
Why is the return value not `Seq[String]`?
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1919,18 +1919,24 @@ class SparkContext(config: SparkConf) extends Logging {
// A JAR file which exists only on the driver node
case "file" => addLocalJarFile(new File(uri.getPath))
// A JAR file which exists locally on every worker node
- case "local" => "file:" + uri.getPath
+ case "local" => Array("file:" + uri.getPath)
+ case "ivy" =>
+ // Since `new Path(path).toUri` will lose query information,
+ // so here we use `URI.create(path)`
+
DependencyUtils.resolveMavenDependencies(URI.create(path)).split(",")
case _ => checkRemoteJarFile(path)
}
}
- if (key != null) {
+ if (keys.nonEmpty) {
val timestamp = if (addedOnSubmit) startTime else
System.currentTimeMillis
- if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
- logInfo(s"Added JAR $path at $key with timestamp $timestamp")
- postEnvironmentUpdate()
- } else {
- logWarning(s"The jar $path has been added already. Overwriting of
added jars " +
- "is not supported in the current version.")
+ keys.foreach { key =>
+ if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
+ logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+ postEnvironmentUpdate()
+ } else {
+ logWarning(s"The jar $path has been added already. Overwriting of
added jars " +
Review comment:
If `transitive=true` and there are too many dependencies, users possibly
see a lot of warning messages?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]