AngersZhuuuu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r534226884
##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,158 @@
* limitations under the License.
*/
-package org.apache.spark.deploy
+package org.apache.spark.util
import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+ packagesExclusions: String,
+ packages: String,
+ repositories: String,
+ ivyRepoPath: String,
+ ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+ def getIvyProperties(): IvyProperties = {
+ val Seq(packagesExclusions, packages, repositories, ivyRepoPath,
ivySettingsPath) = Seq(
+ "spark.jars.excludes",
+ "spark.jars.packages",
+ "spark.jars.repositories",
+ "spark.jars.ivy",
+ "spark.jars.ivySettings"
+ ).map(sys.props.get(_).orNull)
+ IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath,
ivySettingsPath)
+ }
+
+ /**
+ * Parse URI query string's parameter value of `transitive` and `exclude`.
+ * Other invalid parameters will be ignored.
+ *
+ * @param uri Ivy uri need to be downloaded.
+ * @return Tuple value of parameter `transitive` and `exclude` value.
+ *
+ * 1. transitive: whether to download dependency jar of ivy URI,
default value is false
+ * and this parameter value is case-sensitive. Invalid value will
be treat as false.
+ * Example: Input:
exclude=org.mortbay.jetty:jetty&transitive=true
+ * Output: true
+ *
+ * 2. exclude: comma separated exclusions to apply when resolving
transitive dependencies,
+ * consists of `group:module` pairs separated by commas.
+ * Example: Input:
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+ * Output: [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+ */
+ private def parseQueryParams(uri: URI): (Boolean, String) = {
+ val uriQuery = uri.getQuery
+ if (uriQuery == null) {
+ (false, "")
+ } else {
+ val mapTokens = uriQuery.split("&").map(_.split("="))
+ if (mapTokens.exists(token =>
+ token.length != 2 || StringUtils.isBlank(token(0)) ||
StringUtils.isBlank(token(1)))) {
+ throw new URISyntaxException(uri.toString, s"Invalid query string:
$uriQuery")
+ }
+ val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+ // Parse transitive parameters (e.g., transitive=true) in an ivy URL,
default value is false
+ var transitive: Boolean = false
+ groupedParams.get("transitive").foreach { params =>
+ if (params.length > 1) {
+ logWarning("It's best to specify `transitive` parameter in ivy URL
query only once." +
+ " If there are multiple `transitive` parameter, we will select the
last one")
+ }
+ params.map(_._2).foreach {
+ case "true" => transitive = true
+ case _ => transitive = false
+ }
+ }
+ // Parse an excluded list (e.g.,
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+ // in an ivy URL. When download ivy URL jar, Spark won't download
transitive jar
+ // in a excluded list.
+ val exclusionList = groupedParams.get("exclude").map { params =>
+ params.map(_._2).flatMap { excludeString =>
+ val excludes = excludeString.split(",")
+ if (excludes.map(_.split(":")).exists(token =>
+ token.length != 2 || StringUtils.isBlank(token(0)) ||
StringUtils.isBlank(token(1)))) {
+ throw new URISyntaxException(uri.toString, "Invalid exclude
string: " +
+ "expected 'org:module,org:module,..', found " + excludeString)
+ }
+ excludes
+ }.mkString(",")
+ }.getOrElse("")
+
+ val invalidParams = groupedParams
+ .filter(entry => !Seq("transitive", "exclude").contains(entry._1))
+ .keys.toArray.sorted
+ if (invalidParams.nonEmpty) {
+ logWarning(
+ s"Invalid parameters `${invalidParams.mkString(",")}` found in URI
query `$uriQuery`.")
+ }
+
+ groupedParams.foreach { case (key: String, values: Array[(String,
String)]) =>
+ if (key != "transitive" || key != "exclude") {
+ logWarning("Invalid parameter")
+ }
+ }
Review comment:
== forgot remove
##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,158 @@
* limitations under the License.
*/
-package org.apache.spark.deploy
+package org.apache.spark.util
import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+ packagesExclusions: String,
+ packages: String,
+ repositories: String,
+ ivyRepoPath: String,
+ ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+ def getIvyProperties(): IvyProperties = {
+ val Seq(packagesExclusions, packages, repositories, ivyRepoPath,
ivySettingsPath) = Seq(
+ "spark.jars.excludes",
+ "spark.jars.packages",
+ "spark.jars.repositories",
+ "spark.jars.ivy",
+ "spark.jars.ivySettings"
+ ).map(sys.props.get(_).orNull)
+ IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath,
ivySettingsPath)
+ }
+
+ /**
+ * Parse URI query string's parameter value of `transitive` and `exclude`.
+ * Other invalid parameters will be ignored.
+ *
+ * @param uri Ivy uri need to be downloaded.
+ * @return Tuple value of parameter `transitive` and `exclude` value.
+ *
+ * 1. transitive: whether to download dependency jar of ivy URI,
default value is false
+ * and this parameter value is case-sensitive. Invalid value will
be treat as false.
+ * Example: Input:
exclude=org.mortbay.jetty:jetty&transitive=true
+ * Output: true
+ *
+ * 2. exclude: comma separated exclusions to apply when resolving
transitive dependencies,
+ * consists of `group:module` pairs separated by commas.
+ * Example: Input:
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+ * Output: [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+ */
+ private def parseQueryParams(uri: URI): (Boolean, String) = {
+ val uriQuery = uri.getQuery
+ if (uriQuery == null) {
+ (false, "")
+ } else {
+ val mapTokens = uriQuery.split("&").map(_.split("="))
+ if (mapTokens.exists(token =>
+ token.length != 2 || StringUtils.isBlank(token(0)) ||
StringUtils.isBlank(token(1)))) {
+ throw new URISyntaxException(uri.toString, s"Invalid query string:
$uriQuery")
+ }
+ val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+ // Parse transitive parameters (e.g., transitive=true) in an ivy URL,
default value is false
+ var transitive: Boolean = false
+ groupedParams.get("transitive").foreach { params =>
+ if (params.length > 1) {
+ logWarning("It's best to specify `transitive` parameter in ivy URL
query only once." +
+ " If there are multiple `transitive` parameter, we will select the
last one")
+ }
+ params.map(_._2).foreach {
+ case "true" => transitive = true
+ case _ => transitive = false
+ }
+ }
+ // Parse an excluded list (e.g.,
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+ // in an ivy URL. When download ivy URL jar, Spark won't download
transitive jar
+ // in a excluded list.
+ val exclusionList = groupedParams.get("exclude").map { params =>
+ params.map(_._2).flatMap { excludeString =>
+ val excludes = excludeString.split(",")
+ if (excludes.map(_.split(":")).exists(token =>
+ token.length != 2 || StringUtils.isBlank(token(0)) ||
StringUtils.isBlank(token(1)))) {
+ throw new URISyntaxException(uri.toString, "Invalid exclude
string: " +
+ "expected 'org:module,org:module,..', found " + excludeString)
+ }
+ excludes
+ }.mkString(",")
+ }.getOrElse("")
+
+ val invalidParams = groupedParams
+ .filter(entry => !Seq("transitive", "exclude").contains(entry._1))
+ .keys.toArray.sorted
+ if (invalidParams.nonEmpty) {
+ logWarning(
+ s"Invalid parameters `${invalidParams.mkString(",")}` found in URI
query `$uriQuery`.")
+ }
Review comment:
yea
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1890,47 +1890,66 @@ class SparkContext(config: SparkConf) extends Logging {
throw new IllegalArgumentException(
s"Directory ${path} is not allowed for addJar")
}
- path
+ Seq(path)
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
- null
+ Nil
}
} else {
- path
+ Seq(path)
}
}
if (path == null || path.isEmpty) {
logWarning("null or empty path specified as parameter to addJar")
} else {
- val key = if (path.contains("\\") && Utils.isWindows) {
+ var schema = ""
+ val keys = if (path.contains("\\") && Utils.isWindows) {
// For local paths with backslashes on Windows, URI throws an exception
addLocalJarFile(new File(path))
} else {
val uri = new Path(path).toUri
// SPARK-17650: Make sure this is a valid URL before adding it to the
list of dependencies
Utils.validateURL(uri)
- uri.getScheme match {
+ schema = uri.getScheme
+ schema match {
// A JAR file which exists only on the driver node
case null =>
// SPARK-22585 path without schema is not url encoded
addLocalJarFile(new File(uri.getPath))
// A JAR file which exists only on the driver node
case "file" => addLocalJarFile(new File(uri.getPath))
// A JAR file which exists locally on every worker node
- case "local" => "file:" + uri.getPath
+ case "local" => Seq("file:" + uri.getPath)
+ case "ivy" =>
+ // Since `new Path(path).toUri` will lose query information,
+ // so here we use `URI.create(path)`
+ DependencyUtils.resolveMavenDependencies(URI.create(path))
case _ => checkRemoteJarFile(path)
}
}
- if (key != null) {
+ if (keys.nonEmpty) {
val timestamp = if (addedOnSubmit) startTime else
System.currentTimeMillis
- if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
- logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+ val (added, existed) = keys.partition(addedJars.putIfAbsent(_,
timestamp).isEmpty)
+ if (added.nonEmpty) {
+ if (schema != "ivy") {
+ logInfo(s"Added JAR $path at ${added.mkString(",")} with timestamp
$timestamp")
+ } else {
+ logInfo(s"Added dependency jars of ivy uri $path at
${added.mkString(",")}" +
+ s" with timestamp $timestamp")
+ }
postEnvironmentUpdate()
- } else {
- logWarning(s"The jar $path has been added already. Overwriting of
added jars " +
- "is not supported in the current version.")
+ }
+ if (existed.nonEmpty) {
+ if (schema != "ivy") {
+ logWarning(s"The jar $path has been added already. Overwriting of
added jars " +
+ "is not supported in the current version.")
+ } else {
+ logWarning(s"The dependency jars of ivy URI with $path at" +
+ s" ${existed.mkString(",")} has been added already." +
+ s" Overwriting of added jars is not supported in the current
version.")
Review comment:
Yea
##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,158 @@
* limitations under the License.
*/
-package org.apache.spark.deploy
+package org.apache.spark.util
import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+ packagesExclusions: String,
+ packages: String,
+ repositories: String,
+ ivyRepoPath: String,
+ ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+ def getIvyProperties(): IvyProperties = {
+ val Seq(packagesExclusions, packages, repositories, ivyRepoPath,
ivySettingsPath) = Seq(
+ "spark.jars.excludes",
+ "spark.jars.packages",
+ "spark.jars.repositories",
+ "spark.jars.ivy",
+ "spark.jars.ivySettings"
+ ).map(sys.props.get(_).orNull)
+ IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath,
ivySettingsPath)
+ }
+
+ /**
+ * Parse URI query string's parameter value of `transitive` and `exclude`.
+ * Other invalid parameters will be ignored.
+ *
+ * @param uri Ivy uri need to be downloaded.
+ * @return Tuple value of parameter `transitive` and `exclude` value.
+ *
+ * 1. transitive: whether to download dependency jar of ivy URI,
default value is false
+ * and this parameter value is case-sensitive. Invalid value will
be treat as false.
+ * Example: Input:
exclude=org.mortbay.jetty:jetty&transitive=true
+ * Output: true
+ *
+ * 2. exclude: comma separated exclusions to apply when resolving
transitive dependencies,
+ * consists of `group:module` pairs separated by commas.
+ * Example: Input:
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+ * Output: [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+ */
+ private def parseQueryParams(uri: URI): (Boolean, String) = {
+ val uriQuery = uri.getQuery
+ if (uriQuery == null) {
+ (false, "")
+ } else {
+ val mapTokens = uriQuery.split("&").map(_.split("="))
+ if (mapTokens.exists(token =>
+ token.length != 2 || StringUtils.isBlank(token(0)) ||
StringUtils.isBlank(token(1)))) {
+ throw new URISyntaxException(uri.toString, s"Invalid query string:
$uriQuery")
+ }
+ val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+ // Parse transitive parameters (e.g., transitive=true) in an ivy URL,
default value is false
+ var transitive: Boolean = false
Review comment:
Yea
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1890,47 +1890,66 @@ class SparkContext(config: SparkConf) extends Logging {
throw new IllegalArgumentException(
s"Directory ${path} is not allowed for addJar")
}
- path
+ Seq(path)
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
- null
+ Nil
}
} else {
- path
+ Seq(path)
}
}
if (path == null || path.isEmpty) {
logWarning("null or empty path specified as parameter to addJar")
} else {
- val key = if (path.contains("\\") && Utils.isWindows) {
+ var schema = ""
+ val keys = if (path.contains("\\") && Utils.isWindows) {
// For local paths with backslashes on Windows, URI throws an exception
addLocalJarFile(new File(path))
} else {
val uri = new Path(path).toUri
// SPARK-17650: Make sure this is a valid URL before adding it to the
list of dependencies
Utils.validateURL(uri)
- uri.getScheme match {
+ schema = uri.getScheme
+ schema match {
// A JAR file which exists only on the driver node
case null =>
// SPARK-22585 path without schema is not url encoded
addLocalJarFile(new File(uri.getPath))
// A JAR file which exists only on the driver node
case "file" => addLocalJarFile(new File(uri.getPath))
// A JAR file which exists locally on every worker node
- case "local" => "file:" + uri.getPath
+ case "local" => Seq("file:" + uri.getPath)
+ case "ivy" =>
+ // Since `new Path(path).toUri` will lose query information,
+ // so here we use `URI.create(path)`
+ DependencyUtils.resolveMavenDependencies(URI.create(path))
case _ => checkRemoteJarFile(path)
}
}
- if (key != null) {
+ if (keys.nonEmpty) {
val timestamp = if (addedOnSubmit) startTime else
System.currentTimeMillis
- if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
- logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+ val (added, existed) = keys.partition(addedJars.putIfAbsent(_,
timestamp).isEmpty)
+ if (added.nonEmpty) {
+ if (schema != "ivy") {
+ logInfo(s"Added JAR $path at ${added.mkString(",")} with timestamp
$timestamp")
+ } else {
+ logInfo(s"Added dependency jars of ivy uri $path at
${added.mkString(",")}" +
+ s" with timestamp $timestamp")
Review comment:
Yea
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]