[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-21 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r547037172



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1959,47 +1959,58 @@ class SparkContext(config: SparkConf) extends Logging {
 throw new IllegalArgumentException(
   s"Directory ${path} is not allowed for addJar")
   }
-  path
+  Seq(path)
 } catch {
   case NonFatal(e) =>
 logError(s"Failed to add $path to Spark environment", e)
-null
+Nil
 }
   } else {
-path
+Seq(path)
   }
 }
 
 if (path == null || path.isEmpty) {
   logWarning("null or empty path specified as parameter to addJar")
 } else {
-  val key = if (path.contains("\\") && Utils.isWindows) {
+  val (keys, schema) = if (path.contains("\\") && Utils.isWindows) {
 // For local paths with backslashes on Windows, URI throws an exception
-addLocalJarFile(new File(path))
+(addLocalJarFile(new File(path)), "local")
   } else {
 val uri = new Path(path).toUri
 // SPARK-17650: Make sure this is a valid URL before adding it to the 
list of dependencies
 Utils.validateURL(uri)
-uri.getScheme match {
+val uriSchema = uri.getScheme
+val jarPaths = uriSchema match {
   // A JAR file which exists only on the driver node
   case null =>
 // SPARK-22585 path without schema is not url encoded
 addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Seq("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+DependencyUtils.resolveMavenDependencies(URI.create(path))
+  .flatMap(jar => addLocalJarFile(new File(jar)))
   case _ => checkRemoteJarFile(path)
 }
+(jarPaths, uriSchema)
   }
-  if (key != null) {
+  if (keys.nonEmpty) {
 val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-  logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+val (added, existed) = keys.partition(addedJars.putIfAbsent(_, 
timestamp).isEmpty)
+if (added.nonEmpty) {
+  val jarMessage = if (schema != "ivy") "JAR" else "dependency jars of 
ivy uri"
+  logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with 
timestamp $timestamp")
   postEnvironmentUpdate()
-} else {
-  logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
-"is not supported in the current version.")
+}
+if (existed.nonEmpty) {
+  val jarMessage = if (schema != "ivy") "JAR" else "dependency jars of 
ivy uri"
+  logInfo(s"The $jarMessage $path at ${existed.mkString(",")} has been 
added already." +
+s" Overwriting of added jar is not supported in the current 
version.")

Review comment:
   nit: remove `s`

##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -25,12 +25,140 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+packagesExclusions: String,
+packages: String,
+repositories: String,
+ivyRepoPath: String,
+ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): IvyProperties = {
+val Seq(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath) = Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath)
+  }
+
+  private def isInvalidQueryString(tokens: Array[String]): Boolean = {
+tokens.length != 2 || StringUtils.isBlank(tokens(0)) || 
StringUtils.isBlank(tokens(1))
+  }
+
+  /**
+   * Parse URI query string's parameter value of `transitive` and `exclude`.
+   * Other invalid parameters will be ignored.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Tuple value 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-06 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r537259934



##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,29 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("SPARK-33084: add jar when path contains comma") {
+withTempDir { tmpDir =>
+  val tmpJar = File.createTempFile("Test,UDTF", ".jar", tmpDir)
+  sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
+  sc.addJar(tmpJar.getAbsolutePath)
+  assert(sc.listJars().exists(_.contains("Test,UDTF")))

Review comment:
   https://github.com/apache/spark/pull/29966#issuecomment-739631644 hm, I 
think it would be better to check jars are added correctly in executors, too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533948414



##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -955,6 +978,121 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 .set(EXECUTOR_ALLOW_SPARK_CONTEXT, true)).stop()
 }
   }
+
+  test("SPARK-33084: Add jar support ivy url -- default transitive = false") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")
+
assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- invalid transitive use default 
false") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=foo")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+assert(!sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar")))
+
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- transitive=true will download 
dependency jars") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+assert(sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar")))
+
assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- test exclude param when 
transitive=true") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0" +
+  "?exclude=commons-lang:commons-lang=true")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+assert(sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar")))
+
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- test different version") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.6.0")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.6.0.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- test invalid param") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?invalidParam=foo")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- test multiple transitive 
params") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?" +
+  "transitive=true=false=invalidValue")

Review comment:
   > Could you add tests for ?transitive=true=invalidValue, too?
   #29966 (comment)
   
   Where?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533942826



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,158 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+packagesExclusions: String,
+packages: String,
+repositories: String,
+ivyRepoPath: String,
+ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): IvyProperties = {
+val Seq(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath) = Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath)
+  }
+
+  /**
+   * Parse URI query string's parameter value of `transitive` and `exclude`.
+   * Other invalid parameters will be ignored.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Tuple value of parameter `transitive` and `exclude` value.
+   *
+   * 1. transitive: whether to download dependency jar of ivy URI, 
default value is false
+   *and this parameter value is case-sensitive. Invalid value will 
be treat as false.
+   *Example: Input:  
exclude=org.mortbay.jetty:jetty=true
+   *Output:  true
+   *
+   * 2. exclude: comma separated exclusions to apply when resolving 
transitive dependencies,
+   *consists of `group:module` pairs separated by commas.
+   *Example: Input:  
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *Output:  [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+   */
+  private def parseQueryParams(uri: URI): (Boolean, String) = {
+val uriQuery = uri.getQuery
+if (uriQuery == null) {
+  (false, "")
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, s"Invalid query string: 
$uriQuery")
+  }
+  val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+  // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+  var transitive: Boolean = false
+  groupedParams.get("transitive").foreach { params =>
+if (params.length > 1) {
+  logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+" If there are multiple `transitive` parameter, we will select the 
last one")
+}
+params.map(_._2).foreach {
+  case "true" => transitive = true
+  case _ => transitive = false
+}
+  }
+  // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+  // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+  // in a excluded list.
+  val exclusionList = groupedParams.get("exclude").map { params =>
+params.map(_._2).flatMap { excludeString =>
+  val excludes = excludeString.split(",")
+  if (excludes.map(_.split(":")).exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, "Invalid exclude 
string: " +
+  "expected 'org:module,org:module,..', found " + excludeString)
+  }
+  excludes
+}.mkString(",")
+  }.getOrElse("")
+
+  val invalidParams = groupedParams
+.filter(entry => !Seq("transitive", "exclude").contains(entry._1))
+.keys.toArray.sorted
+  if (invalidParams.nonEmpty) {
+logWarning(
+  s"Invalid parameters `${invalidParams.mkString(",")}` found in URI 
query `$uriQuery`.")
+  }
+
+  groupedParams.foreach { case (key: String, values: Array[(String, 
String)]) =>
+if (key != "transitive" || key != "exclude") {
+  logWarning("Invalid parameter")
+}
+  }
+
+  (transitive, exclusionList)
+}
+  }
+
+  /**
+   * Download Ivy URIs dependency jars.
+   *
+   * 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533931068



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,158 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+packagesExclusions: String,
+packages: String,
+repositories: String,
+ivyRepoPath: String,
+ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): IvyProperties = {
+val Seq(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath) = Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath)
+  }
+
+  /**
+   * Parse URI query string's parameter value of `transitive` and `exclude`.
+   * Other invalid parameters will be ignored.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Tuple value of parameter `transitive` and `exclude` value.
+   *
+   * 1. transitive: whether to download dependency jar of ivy URI, 
default value is false
+   *and this parameter value is case-sensitive. Invalid value will 
be treat as false.
+   *Example: Input:  
exclude=org.mortbay.jetty:jetty=true
+   *Output:  true
+   *
+   * 2. exclude: comma separated exclusions to apply when resolving 
transitive dependencies,
+   *consists of `group:module` pairs separated by commas.
+   *Example: Input:  
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *Output:  [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+   */
+  private def parseQueryParams(uri: URI): (Boolean, String) = {
+val uriQuery = uri.getQuery
+if (uriQuery == null) {
+  (false, "")
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, s"Invalid query string: 
$uriQuery")
+  }
+  val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+  // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+  var transitive: Boolean = false
+  groupedParams.get("transitive").foreach { params =>
+if (params.length > 1) {
+  logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+" If there are multiple `transitive` parameter, we will select the 
last one")
+}
+params.map(_._2).foreach {
+  case "true" => transitive = true
+  case _ => transitive = false
+}
+  }
+  // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+  // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+  // in a excluded list.
+  val exclusionList = groupedParams.get("exclude").map { params =>
+params.map(_._2).flatMap { excludeString =>
+  val excludes = excludeString.split(",")
+  if (excludes.map(_.split(":")).exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, "Invalid exclude 
string: " +
+  "expected 'org:module,org:module,..', found " + excludeString)
+  }
+  excludes
+}.mkString(",")
+  }.getOrElse("")
+
+  val invalidParams = groupedParams
+.filter(entry => !Seq("transitive", "exclude").contains(entry._1))
+.keys.toArray.sorted
+  if (invalidParams.nonEmpty) {
+logWarning(
+  s"Invalid parameters `${invalidParams.mkString(",")}` found in URI 
query `$uriQuery`.")
+  }
+
+  groupedParams.foreach { case (key: String, values: Array[(String, 
String)]) =>
+if (key != "transitive" || key != "exclude") {
+  logWarning("Invalid parameter")
+}
+  }
+
+  (transitive, exclusionList)
+}
+  }
+
+  /**
+   * Download Ivy URIs dependency jars.
+   *
+   * 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533931068



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,158 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+packagesExclusions: String,
+packages: String,
+repositories: String,
+ivyRepoPath: String,
+ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): IvyProperties = {
+val Seq(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath) = Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath)
+  }
+
+  /**
+   * Parse URI query string's parameter value of `transitive` and `exclude`.
+   * Other invalid parameters will be ignored.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Tuple value of parameter `transitive` and `exclude` value.
+   *
+   * 1. transitive: whether to download dependency jar of ivy URI, 
default value is false
+   *and this parameter value is case-sensitive. Invalid value will 
be treat as false.
+   *Example: Input:  
exclude=org.mortbay.jetty:jetty=true
+   *Output:  true
+   *
+   * 2. exclude: comma separated exclusions to apply when resolving 
transitive dependencies,
+   *consists of `group:module` pairs separated by commas.
+   *Example: Input:  
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *Output:  [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+   */
+  private def parseQueryParams(uri: URI): (Boolean, String) = {
+val uriQuery = uri.getQuery
+if (uriQuery == null) {
+  (false, "")
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, s"Invalid query string: 
$uriQuery")
+  }
+  val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+  // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+  var transitive: Boolean = false
+  groupedParams.get("transitive").foreach { params =>
+if (params.length > 1) {
+  logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+" If there are multiple `transitive` parameter, we will select the 
last one")
+}
+params.map(_._2).foreach {
+  case "true" => transitive = true
+  case _ => transitive = false
+}
+  }
+  // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+  // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+  // in a excluded list.
+  val exclusionList = groupedParams.get("exclude").map { params =>
+params.map(_._2).flatMap { excludeString =>
+  val excludes = excludeString.split(",")
+  if (excludes.map(_.split(":")).exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, "Invalid exclude 
string: " +
+  "expected 'org:module,org:module,..', found " + excludeString)
+  }
+  excludes
+}.mkString(",")
+  }.getOrElse("")
+
+  val invalidParams = groupedParams
+.filter(entry => !Seq("transitive", "exclude").contains(entry._1))
+.keys.toArray.sorted
+  if (invalidParams.nonEmpty) {
+logWarning(
+  s"Invalid parameters `${invalidParams.mkString(",")}` found in URI 
query `$uriQuery`.")
+  }
+
+  groupedParams.foreach { case (key: String, values: Array[(String, 
String)]) =>
+if (key != "transitive" || key != "exclude") {
+  logWarning("Invalid parameter")
+}
+  }
+
+  (transitive, exclusionList)
+}
+  }
+
+  /**
+   * Download Ivy URIs dependency jars.
+   *
+   * 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533928892



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,158 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+packagesExclusions: String,
+packages: String,
+repositories: String,
+ivyRepoPath: String,
+ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): IvyProperties = {
+val Seq(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath) = Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath)
+  }
+
+  /**
+   * Parse URI query string's parameter value of `transitive` and `exclude`.
+   * Other invalid parameters will be ignored.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Tuple value of parameter `transitive` and `exclude` value.
+   *
+   * 1. transitive: whether to download dependency jar of ivy URI, 
default value is false
+   *and this parameter value is case-sensitive. Invalid value will 
be treat as false.
+   *Example: Input:  
exclude=org.mortbay.jetty:jetty=true
+   *Output:  true
+   *
+   * 2. exclude: comma separated exclusions to apply when resolving 
transitive dependencies,
+   *consists of `group:module` pairs separated by commas.
+   *Example: Input:  
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *Output:  [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+   */
+  private def parseQueryParams(uri: URI): (Boolean, String) = {
+val uriQuery = uri.getQuery
+if (uriQuery == null) {
+  (false, "")
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, s"Invalid query string: 
$uriQuery")
+  }
+  val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+  // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+  var transitive: Boolean = false
+  groupedParams.get("transitive").foreach { params =>
+if (params.length > 1) {
+  logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+" If there are multiple `transitive` parameter, we will select the 
last one")
+}
+params.map(_._2).foreach {
+  case "true" => transitive = true
+  case _ => transitive = false
+}
+  }
+  // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+  // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+  // in a excluded list.
+  val exclusionList = groupedParams.get("exclude").map { params =>
+params.map(_._2).flatMap { excludeString =>
+  val excludes = excludeString.split(",")
+  if (excludes.map(_.split(":")).exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, "Invalid exclude 
string: " +
+  "expected 'org:module,org:module,..', found " + excludeString)
+  }
+  excludes
+}.mkString(",")
+  }.getOrElse("")
+
+  val invalidParams = groupedParams
+.filter(entry => !Seq("transitive", "exclude").contains(entry._1))
+.keys.toArray.sorted
+  if (invalidParams.nonEmpty) {
+logWarning(
+  s"Invalid parameters `${invalidParams.mkString(",")}` found in URI 
query `$uriQuery`.")
+  }

Review comment:
   nit format:
   ```
 val validParams = Set("transitive", "exclude")
 val invalidParams = 
groupedParams.keys.filterNot(validParams.contains).toSeq.sorted
 if (invalidParams.nonEmpty) {
   logWarning(s"Invalid parameters `${invalidParams.mkString(",")}` 
found " +
 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533928805



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,158 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+packagesExclusions: String,
+packages: String,
+repositories: String,
+ivyRepoPath: String,
+ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): IvyProperties = {
+val Seq(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath) = Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath)
+  }
+
+  /**
+   * Parse URI query string's parameter value of `transitive` and `exclude`.
+   * Other invalid parameters will be ignored.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Tuple value of parameter `transitive` and `exclude` value.
+   *
+   * 1. transitive: whether to download dependency jar of ivy URI, 
default value is false
+   *and this parameter value is case-sensitive. Invalid value will 
be treat as false.
+   *Example: Input:  
exclude=org.mortbay.jetty:jetty=true
+   *Output:  true
+   *
+   * 2. exclude: comma separated exclusions to apply when resolving 
transitive dependencies,
+   *consists of `group:module` pairs separated by commas.
+   *Example: Input:  
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *Output:  [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+   */
+  private def parseQueryParams(uri: URI): (Boolean, String) = {
+val uriQuery = uri.getQuery
+if (uriQuery == null) {
+  (false, "")
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, s"Invalid query string: 
$uriQuery")
+  }
+  val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+  // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+  var transitive: Boolean = false
+  groupedParams.get("transitive").foreach { params =>
+if (params.length > 1) {
+  logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+" If there are multiple `transitive` parameter, we will select the 
last one")
+}
+params.map(_._2).foreach {
+  case "true" => transitive = true
+  case _ => transitive = false
+}
+  }
+  // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+  // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+  // in a excluded list.
+  val exclusionList = groupedParams.get("exclude").map { params =>
+params.map(_._2).flatMap { excludeString =>
+  val excludes = excludeString.split(",")
+  if (excludes.map(_.split(":")).exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, "Invalid exclude 
string: " +
+  "expected 'org:module,org:module,..', found " + excludeString)
+  }
+  excludes
+}.mkString(",")
+  }.getOrElse("")
+
+  val invalidParams = groupedParams
+.filter(entry => !Seq("transitive", "exclude").contains(entry._1))
+.keys.toArray.sorted
+  if (invalidParams.nonEmpty) {
+logWarning(
+  s"Invalid parameters `${invalidParams.mkString(",")}` found in URI 
query `$uriQuery`.")
+  }
+
+  groupedParams.foreach { case (key: String, values: Array[(String, 
String)]) =>
+if (key != "transitive" || key != "exclude") {
+  logWarning("Invalid parameter")
+}
+  }

Review comment:
   What's this?





This is 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533927237



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,158 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+packagesExclusions: String,
+packages: String,
+repositories: String,
+ivyRepoPath: String,
+ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): IvyProperties = {
+val Seq(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath) = Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath)
+  }
+
+  /**
+   * Parse URI query string's parameter value of `transitive` and `exclude`.
+   * Other invalid parameters will be ignored.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Tuple value of parameter `transitive` and `exclude` value.
+   *
+   * 1. transitive: whether to download dependency jar of ivy URI, 
default value is false
+   *and this parameter value is case-sensitive. Invalid value will 
be treat as false.
+   *Example: Input:  
exclude=org.mortbay.jetty:jetty=true
+   *Output:  true
+   *
+   * 2. exclude: comma separated exclusions to apply when resolving 
transitive dependencies,
+   *consists of `group:module` pairs separated by commas.
+   *Example: Input:  
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *Output:  [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+   */
+  private def parseQueryParams(uri: URI): (Boolean, String) = {
+val uriQuery = uri.getQuery
+if (uriQuery == null) {
+  (false, "")
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, s"Invalid query string: 
$uriQuery")
+  }
+  val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+  // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+  var transitive: Boolean = false
+  groupedParams.get("transitive").foreach { params =>
+if (params.length > 1) {
+  logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+" If there are multiple `transitive` parameter, we will select the 
last one")
+}
+params.map(_._2).foreach {
+  case "true" => transitive = true
+  case _ => transitive = false
+}
+  }
+  // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+  // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+  // in a excluded list.
+  val exclusionList = groupedParams.get("exclude").map { params =>
+params.map(_._2).flatMap { excludeString =>
+  val excludes = excludeString.split(",")
+  if (excludes.map(_.split(":")).exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {

Review comment:
   How about making a helper function for this check? 
https://github.com/apache/spark/pull/29966/files#diff-3e9f71e7d80c1dc7d02b0edef611de280f219789f0d2b282887f07e999020024R74-R75





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533924963



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1890,47 +1890,66 @@ class SparkContext(config: SparkConf) extends Logging {
 throw new IllegalArgumentException(
   s"Directory ${path} is not allowed for addJar")
   }
-  path
+  Seq(path)
 } catch {
   case NonFatal(e) =>
 logError(s"Failed to add $path to Spark environment", e)
-null
+Nil
 }
   } else {
-path
+Seq(path)
   }
 }
 
 if (path == null || path.isEmpty) {
   logWarning("null or empty path specified as parameter to addJar")
 } else {
-  val key = if (path.contains("\\") && Utils.isWindows) {
+  var schema = ""
+  val keys = if (path.contains("\\") && Utils.isWindows) {
 // For local paths with backslashes on Windows, URI throws an exception
 addLocalJarFile(new File(path))
   } else {
 val uri = new Path(path).toUri
 // SPARK-17650: Make sure this is a valid URL before adding it to the 
list of dependencies
 Utils.validateURL(uri)
-uri.getScheme match {
+schema = uri.getScheme
+schema match {
   // A JAR file which exists only on the driver node
   case null =>
 // SPARK-22585 path without schema is not url encoded
 addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Seq("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+DependencyUtils.resolveMavenDependencies(URI.create(path))
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys.nonEmpty) {
 val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-  logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+val (added, existed) = keys.partition(addedJars.putIfAbsent(_, 
timestamp).isEmpty)
+if (added.nonEmpty) {
+  if (schema != "ivy") {
+logInfo(s"Added JAR $path at ${added.mkString(",")} with timestamp 
$timestamp")
+  } else {
+logInfo(s"Added dependency jars of ivy uri $path at 
${added.mkString(",")}" +
+  s" with timestamp $timestamp")
+  }
   postEnvironmentUpdate()
-} else {
-  logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
-"is not supported in the current version.")
+}
+if (existed.nonEmpty) {
+  if (schema != "ivy") {
+logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
+  "is not supported in the current version.")
+  } else {
+logWarning(s"The dependency jars of ivy URI with $path at" +
+  s" ${existed.mkString(",")} has been added already." +
+  s" Overwriting of added jars is not supported in the current 
version.")

Review comment:
   nit: remove `s`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533924173



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,158 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+case class IvyProperties(
+packagesExclusions: String,
+packages: String,
+repositories: String,
+ivyRepoPath: String,
+ivySettingsPath: String)
+
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): IvyProperties = {
+val Seq(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath) = Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath)
+  }
+
+  /**
+   * Parse URI query string's parameter value of `transitive` and `exclude`.
+   * Other invalid parameters will be ignored.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Tuple value of parameter `transitive` and `exclude` value.
+   *
+   * 1. transitive: whether to download dependency jar of ivy URI, 
default value is false
+   *and this parameter value is case-sensitive. Invalid value will 
be treat as false.
+   *Example: Input:  
exclude=org.mortbay.jetty:jetty=true
+   *Output:  true
+   *
+   * 2. exclude: comma separated exclusions to apply when resolving 
transitive dependencies,
+   *consists of `group:module` pairs separated by commas.
+   *Example: Input:  
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *Output:  [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+   */
+  private def parseQueryParams(uri: URI): (Boolean, String) = {
+val uriQuery = uri.getQuery
+if (uriQuery == null) {
+  (false, "")
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(token =>
+token.length != 2 || StringUtils.isBlank(token(0)) || 
StringUtils.isBlank(token(1 {
+throw new URISyntaxException(uri.toString, s"Invalid query string: 
$uriQuery")
+  }
+  val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+  // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+  var transitive: Boolean = false

Review comment:
   nit: we don't need the type: `var transitive  = false`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533923652



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1890,47 +1890,66 @@ class SparkContext(config: SparkConf) extends Logging {
 throw new IllegalArgumentException(
   s"Directory ${path} is not allowed for addJar")
   }
-  path
+  Seq(path)
 } catch {
   case NonFatal(e) =>
 logError(s"Failed to add $path to Spark environment", e)
-null
+Nil
 }
   } else {
-path
+Seq(path)
   }
 }
 
 if (path == null || path.isEmpty) {
   logWarning("null or empty path specified as parameter to addJar")
 } else {
-  val key = if (path.contains("\\") && Utils.isWindows) {
+  var schema = ""
+  val keys = if (path.contains("\\") && Utils.isWindows) {
 // For local paths with backslashes on Windows, URI throws an exception
 addLocalJarFile(new File(path))
   } else {
 val uri = new Path(path).toUri
 // SPARK-17650: Make sure this is a valid URL before adding it to the 
list of dependencies
 Utils.validateURL(uri)
-uri.getScheme match {
+schema = uri.getScheme
+schema match {
   // A JAR file which exists only on the driver node
   case null =>
 // SPARK-22585 path without schema is not url encoded
 addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Seq("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+DependencyUtils.resolveMavenDependencies(URI.create(path))
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys.nonEmpty) {
 val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-  logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+val (added, existed) = keys.partition(addedJars.putIfAbsent(_, 
timestamp).isEmpty)
+if (added.nonEmpty) {
+  if (schema != "ivy") {
+logInfo(s"Added JAR $path at ${added.mkString(",")} with timestamp 
$timestamp")
+  } else {
+logInfo(s"Added dependency jars of ivy uri $path at 
${added.mkString(",")}" +
+  s" with timestamp $timestamp")

Review comment:
   nit: 
   ```
 val jarMessage = if (schema != "ivy") "JAR" else "dependency jars 
of ivy uri"
 logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with 
timestamp $timestamp")
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533920784



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1890,47 +1890,66 @@ class SparkContext(config: SparkConf) extends Logging {
 throw new IllegalArgumentException(
   s"Directory ${path} is not allowed for addJar")
   }
-  path
+  Seq(path)
 } catch {
   case NonFatal(e) =>
 logError(s"Failed to add $path to Spark environment", e)
-null
+Nil
 }
   } else {
-path
+Seq(path)
   }
 }
 
 if (path == null || path.isEmpty) {
   logWarning("null or empty path specified as parameter to addJar")
 } else {
-  val key = if (path.contains("\\") && Utils.isWindows) {
+  var schema = ""

Review comment:
   `val schema = uri.getScheme`? Please avoid to use `var` where possible.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533837188



##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -955,6 +978,121 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 .set(EXECUTOR_ALLOW_SPARK_CONTEXT, true)).stop()
 }
   }
+
+  test("SPARK-33084: Add jar support ivy url -- default transitive = false") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")
+
assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- invalid transitive use default 
false") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=foo")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+assert(!sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar")))
+
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- transitive=true will download 
dependency jars") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+assert(sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar")))
+
assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- test exclude param when 
transitive=true") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0" +
+  "?exclude=commons-lang:commons-lang=true")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+assert(sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar")))
+
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- test different version") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.6.0")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.6.0.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- test invalid param") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?invalidParam=foo")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- test multiple transitive 
params") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?" +
+  "transitive=true=false=invalidValue")

Review comment:
   Could you add tests for `?transitive=true=invalidValue`, too?
   https://github.com/apache/spark/pull/29966#discussion_r533434365





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533820823



##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,29 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("SPARK-33084: add jar when path contains comma") {
+withTempDir { tmpDir =>
+  val tmpJar = File.createTempFile("Test,UDTF", ".jar", tmpDir)
+  sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
+  sc.addJar(tmpJar.getAbsolutePath)
+  assert(sc.listJars().exists(_.contains("Test,UDTF")))
+
+  val jarPath = "hdfs:///no/path/to/Test,UDTF1.jar"
+  sc.addJar(jarPath)
+  // Add jar failed since file not exists
+  assert(!sc.listJars().exists(_.contains("/no/path/to/Test,UDTF.jar")))
+
+  Seq("http", "https", "ftp").foreach { scheme =>
+val badURL = s"$scheme://user:pwd/path/Test,UDTF_${scheme}.jar"
+val e1 = intercept[MalformedURLException] {

Review comment:
   nit: `e1` -> `e`

##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -955,6 +978,121 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 .set(EXECUTOR_ALLOW_SPARK_CONTEXT, true)).stop()
 }
   }
+
+  test("SPARK-33084: Add jar support ivy url -- default transitive = false") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")
+
assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- invalid transitive use default 
false") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=foo")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+assert(!sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar")))
+
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- transitive=true will download 
dependency jars") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+assert(sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar")))
+
assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- test exclude param when 
transitive=true") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0" +
+  "?exclude=commons-lang:commons-lang=true")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+assert(sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar")))
+
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
+  }
+
+  test("SPARK-33084: Add jar support ivy url -- test different version") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
+sc.addJar("ivy://org.apache.hive:hive-storage-api:2.6.0")
+
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.6.0.jar")))

Review comment:
   Btw, this behaviour is the same with hive?

##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1919,18 +1919,26 @@ class SparkContext(config: SparkConf) extends Logging {
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Seq("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+DependencyUtils.resolveMavenDependencies(URI.create(path))
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533434365



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,115 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  def parseQueryParams(uriQuery: String): (Boolean, String) = {
+if (uriQuery == null) {
+  (false, "")
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(_.length != 2)) {
+throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+  }
+  val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+  // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+  var transitive = false
+  groupedParams.get("transitive").foreach { params =>
+if (params.length > 1) {
+  logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+" If there are multiple `transitive` parameter, we will select the 
last one")
+}
+params.map(_._2).foreach(value => {
+  if (value == "true") {
+transitive = true
+  } else if (value == "false") {
+transitive = false
+  }

Review comment:
   If so, `transitive=true=invalidValue` means 
`transitive=false`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533396252



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1919,18 +1919,24 @@ class SparkContext(config: SparkConf) extends Logging {
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Seq("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+DependencyUtils.resolveMavenDependencies(URI.create(path))

Review comment:
   What if two added jars have the same dependency with different versions? 
e.g.,
   
   ```
   sc.addJar("ivy://lib1:1.0?transitive=true") // --> it depends on `libX v1.0`
   sc.addJar("ivy://lib2:1.0?transitive=true") // --> it depends on `libX v2.0`
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533384137



##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,29 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("add jar when path contains comma") {

Review comment:
   nit: add `SPARK-33084:`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533382150



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,115 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  def parseQueryParams(uriQuery: String): (Boolean, String) = {
+if (uriQuery == null) {
+  (false, "")
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(_.length != 2)) {
+throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+  }
+  val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+  // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+  var transitive = false
+  groupedParams.get("transitive").foreach { params =>
+if (params.length > 1) {
+  logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+" If there are multiple `transitive` parameter, we will select the 
last one")
+}
+params.map(_._2).foreach(value => {
+  if (value == "true") {
+transitive = true
+  } else if (value == "false") {
+transitive = false
+  }

Review comment:
   nit format:
   ```
   params.map(_._2).foreach {
 case "true" => transitive = true
 case "false" => transitive = false
 case _ =>
   }
   ```
   Also, could you add more tests for a duplicated `transitive` case like 
`transitive=true=false=invalidValue`,  
`transitive=false=invalidValue=true`, ...? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533375024



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,115 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  def parseQueryParams(uriQuery: String): (Boolean, String) = {
+if (uriQuery == null) {
+  (false, "")
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(_.length != 2)) {
+throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+  }
+  val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+  // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+  var transitive = false
+  groupedParams.get("transitive").foreach { params =>
+if (params.length > 1) {
+  logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+" If there are multiple `transitive` parameter, we will select the 
last one")
+}
+params.map(_._2).foreach(value => {
+  if (value == "true") {
+transitive = true
+  } else if (value == "false") {
+transitive = false
+  }
+})
+  }
+  // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+  // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+  // in a excluded list.
+  val exclusionList = groupedParams.get("exclude").map { params =>
+params.flatMap { case (_, excludeString) =>
+  val excludes = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString, "Invalid exclude 
string: " +
+  "expected 'org:module,org:module,..', found " + excludeString)
+  }
+  excludes
+}.mkString(",")
+  }.getOrElse("")
+
+  (transitive, exclusionList)

Review comment:
   What if an invalid param is given in hive, e.g., `invalidParam=`? It 
is just ignored? Could you add tests?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533372737



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean

Review comment:
   The current behaviour is the same with hive?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533372297



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean
+}
+  }
+
+  /**
+   * Download Ivy URIs dependency jars.
+   *
+   * @param uri Ivy uri need to be downloaded. The URI format should be:
+   *  `ivy://group:module:version[?query]`
+   *Ivy URI query part format should be:
+   *  `parameter=value=value...`
+   *Note that currently ivy URI query part support two parameters:
+   * 1. transitive: whether to download dependent jars related to 
your ivy URL.
+   *transitive=false or `transitive=true`, if not set, the 
default value is false.
+   * 2. exclude: exclusion list when download ivy URL jar and 
dependency jars.
+   *The `exclude` parameter content is a ',' separated 
`group:module` pair string :
+   *`exclude=group:module,group:module...`
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+val Seq(_, _, repositories, ivyRepoPath, ivySettingsPath) =
+  DependencyUtils.getIvyProperties()
+val authority = uri.getAuthority
+if (authority == null) {
+  throw new URISyntaxException(
+authority, "Invalid url: Expected 'org:module:version', found null")
+}
+if (authority.split(":").length != 3) {
+  throw new URISyntaxException(
+authority, "Invalid url: Expected 'org:module:version', found " + 
authority)
+}
+
+val uriQuery = uri.getQuery
+val queryParams: Array[(String, String)] = if (uriQuery == null) {
+  Array.empty[(String, String)]
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(_.length != 2)) {
+throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+  }
+  mapTokens.map(kv => (kv(0), kv(1)))
+}
+
+resolveMavenDependencies(
+  parseTransitive(queryParams.filter(_._1.equals("transitive")).map(_._2)),

Review comment:
   Could you add tests for case-sensitivity?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533370716



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,115 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  def parseQueryParams(uriQuery: String): (Boolean, String) = {

Review comment:
   private?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533368869



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1919,18 +1919,24 @@ class SparkContext(config: SparkConf) extends Logging {
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Array("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+
DependencyUtils.resolveMavenDependencies(URI.create(path)).split(",")
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys.nonEmpty) {
 val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-  logInfo(s"Added JAR $path at $key with timestamp $timestamp")
-  postEnvironmentUpdate()
-} else {
-  logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
-"is not supported in the current version.")
+keys.foreach { key =>
+  if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
+logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+postEnvironmentUpdate()
+  } else {
+logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +

Review comment:
   Looks fine now, but I think we need to make the error message more 
general because `dependencies` in the message is only meaningful in the `ivy` 
schema case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533368869



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1919,18 +1919,24 @@ class SparkContext(config: SparkConf) extends Logging {
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Array("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+
DependencyUtils.resolveMavenDependencies(URI.create(path)).split(",")
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys.nonEmpty) {
 val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-  logInfo(s"Added JAR $path at $key with timestamp $timestamp")
-  postEnvironmentUpdate()
-} else {
-  logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
-"is not supported in the current version.")
+keys.foreach { key =>
+  if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
+logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+postEnvironmentUpdate()
+  } else {
+logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +

Review comment:
   Looks fine now, but I think we need to make the error message more 
general because `dependencies` is only meaningful in the `ivy` schema case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-12-01 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533368869



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1919,18 +1919,24 @@ class SparkContext(config: SparkConf) extends Logging {
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Array("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+
DependencyUtils.resolveMavenDependencies(URI.create(path)).split(",")
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys.nonEmpty) {
 val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-  logInfo(s"Added JAR $path at $key with timestamp $timestamp")
-  postEnvironmentUpdate()
-} else {
-  logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
-"is not supported in the current version.")
+keys.foreach { key =>
+  if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
+logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+postEnvironmentUpdate()
+  } else {
+logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +

Review comment:
   Looks fine now, but I think we need to make the error message more 
general because `dependencies` is only meaningful for the `ivy` schema case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533087617



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean
+}
+  }
+
+  /**
+   * Download Ivy URIs dependency jars.
+   *
+   * @param uri Ivy uri need to be downloaded. The URI format should be:
+   *  `ivy://group:module:version[?query]`
+   *Ivy URI query part format should be:
+   *  `parameter=value=value...`
+   *Note that currently ivy URI query part support two parameters:
+   * 1. transitive: whether to download dependent jars related to 
your ivy URL.
+   *transitive=false or `transitive=true`, if not set, the 
default value is false.
+   * 2. exclude: exclusion list when download ivy URL jar and 
dependency jars.
+   *The `exclude` parameter content is a ',' separated 
`group:module` pair string :
+   *`exclude=group:module,group:module...`
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+val Seq(_, _, repositories, ivyRepoPath, ivySettingsPath) =
+  DependencyUtils.getIvyProperties()
+val authority = uri.getAuthority
+if (authority == null) {
+  throw new URISyntaxException(
+authority, "Invalid url: Expected 'org:module:version', found null")
+}
+if (authority.split(":").length != 3) {
+  throw new URISyntaxException(
+authority, "Invalid url: Expected 'org:module:version', found " + 
authority)
+}
+
+val uriQuery = uri.getQuery
+val queryParams: Array[(String, String)] = if (uriQuery == null) {
+  Array.empty[(String, String)]
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(_.length != 2)) {
+throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+  }
+  mapTokens.map(kv => (kv(0), kv(1)))
+}
+
+resolveMavenDependencies(
+  parseTransitive(queryParams.filter(_._1.equals("transitive")).map(_._2)),

Review comment:
   We need to respect case-sensitivity for params?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533086750



##
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##
@@ -33,15 +33,30 @@ ADD JAR file_name
 
 * **file_name**
 
-The name of the JAR file to be added. It could be either on a local file 
system or a distributed file system.
+The name of the JAR file to be added. It could be either on a local file 
system or a distributed file system or an ivy URL.
+Apache Ivy is a popular dependency manager focusing on flexibility and 
simplicity. Now we support two parameter in URL query string:
 
+ * transitive: whether to download dependent jars related to your ivy URL.
+ * exclude: exclusion list when download ivy URL jar and dependent jars.
+
+User can write ivy URL such as:
+
+  ivy://group:module:version
+  ivy://group:module:version?transitive=true

Review comment:
   `transitive=true` -> `transitive=[true|false]`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533081191



##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,12 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("add jar local path with comma") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
+sc.addJar("file://Test,UDTF.jar")
+assert(!sc.listJars().exists(_.contains("UDTF.jar")))

Review comment:
   What does this test mean? Why didn't you write it like ` 
assert(sc.listJars().exists(_.contains("Test,UDTF.jar")))`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533081191



##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,12 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("add jar local path with comma") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
+sc.addJar("file://Test,UDTF.jar")
+assert(!sc.listJars().exists(_.contains("UDTF.jar")))

Review comment:
   What does this test mean? Why didn't you do ` 
assert(sc.listJars().exists(_.contains("Test,UDTF.jar")))`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533076491



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1860,7 +1860,7 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
-def addLocalJarFile(file: File): String = {
+def addLocalJarFile(file: File): Array[String] = {

Review comment:
   `Array` -> `Seq`

##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,12 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("add jar local path with comma") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))

Review comment:
   Cold you add tests for other other schemas?

##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1869,15 +1869,15 @@ class SparkContext(config: SparkConf) extends Logging {
   throw new IllegalArgumentException(
 s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
 }
-env.rpcEnv.fileServer.addJar(file)
+Array(env.rpcEnv.fileServer.addJar(file))
   } catch {
 case NonFatal(e) =>
   logError(s"Failed to add $path to Spark environment", e)
-  null
+  Array.empty
   }
 }
 
-def checkRemoteJarFile(path: String): String = {
+def checkRemoteJarFile(path: String): Array[String] = {

Review comment:
   ditto

##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean

Review comment:
   What if `transitive=invalidStr` in hive? Could you add tests?

##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1869,15 +1869,15 @@ class SparkContext(config: SparkConf) extends Logging {
   throw new IllegalArgumentException(
 s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
 }
-env.rpcEnv.fileServer.addJar(file)
+Array(env.rpcEnv.fileServer.addJar(file))
   } catch {
 case NonFatal(e) =>
   logError(s"Failed to add $path to Spark environment", e)
-  null
+  Array.empty

Review comment:
   Then, `Nil`

##
File path: 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r532533545



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -25,12 +25,104 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+
+  private def parseURLQueryParameter(queryString: String, queryTag: String): 
Array[String] = {
+if (queryString == null || queryString.isEmpty) {
+  Array.empty[String]
+} else {
+  val mapTokens = queryString.split("&")
+  assert(mapTokens.forall(_.split("=").length == 2)
+, "Invalid URI query string: [ " + queryString + " ]")
+  mapTokens.map(_.split("=")).map(kv => (kv(0), kv(1))).filter(_._1 == 
queryTag).map(_._2)
+}
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(queryString: String): String = {
+parseURLQueryParameter(queryString, "exclude")
+  .flatMap { excludeString =>
+val excludes: Array[String] = excludeString.split(",")
+assert(excludes.forall(_.split(":").length == 2),
+  "Invalid exclude string: expected 'org:module,org:module,..'," +
+" found [ " + excludeString + " ]")
+excludes
+  }.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(queryString: String): Boolean = {
+val transitive = parseURLQueryParameter(queryString, "transitive")

Review comment:
   It seems you don't push the latest commit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r532532174



##
File path: 
core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
##
@@ -80,15 +80,10 @@ object DriverWrapper extends Logging {
 val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
 
 val Seq(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath) =
-  Seq(
-"spark.jars.excludes",
-"spark.jars.packages",
-"spark.jars.repositories",
-"spark.jars.ivy",
-"spark.jars.ivySettings"
-  ).map(sys.props.get(_).orNull)
+  DependencyUtils.getIvyProperties()
 
-val resolvedMavenCoordinates = 
DependencyUtils.resolveMavenDependencies(packagesExclusions,
+val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
+  true, packagesExclusions,
   packages, repositories, ivyRepoPath, Option(ivySettingsPath))

Review comment:
   Not updated yet...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r532530241



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
   case "local" => "file:" + uri.getPath
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+DependencyUtils.resolveMavenDependencies(URI.create(path))
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys != null) {

Review comment:
   Really? What if an existing file path contains a comma, e.g., 
`file://Test,UDTF.jar`? Could you add tests?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r532475651



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
   case "local" => "file:" + uri.getPath
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+DependencyUtils.resolveMavenDependencies(URI.create(path))
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys != null) {

Review comment:
   I meant, does this PR change the existing behaivour of the local/file 
cases?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r532471580



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -25,12 +25,104 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+
+  private def parseURLQueryParameter(queryString: String, queryTag: String): 
Array[String] = {
+if (queryString == null || queryString.isEmpty) {
+  Array.empty[String]
+} else {
+  val mapTokens = queryString.split("&")
+  assert(mapTokens.forall(_.split("=").length == 2)
+, "Invalid URI query string: [ " + queryString + " ]")
+  mapTokens.map(_.split("=")).map(kv => (kv(0), kv(1))).filter(_._1 == 
queryTag).map(_._2)
+}
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(queryString: String): String = {
+parseURLQueryParameter(queryString, "exclude")
+  .flatMap { excludeString =>
+val excludes: Array[String] = excludeString.split(",")
+assert(excludes.forall(_.split(":").length == 2),
+  "Invalid exclude string: expected 'org:module,org:module,..'," +
+" found [ " + excludeString + " ]")
+excludes
+  }.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(queryString: String): Boolean = {
+val transitive = parseURLQueryParameter(queryString, "transitive")
+if (transitive.isEmpty) {
+  false
+} else {
+  if (transitive.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")

Review comment:
   Ah, ok.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r532421652



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -25,12 +25,104 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+
+  private def parseURLQueryParameter(queryString: String, queryTag: String): 
Array[String] = {
+if (queryString == null || queryString.isEmpty) {
+  Array.empty[String]
+} else {
+  val mapTokens = queryString.split("&")
+  assert(mapTokens.forall(_.split("=").length == 2)
+, "Invalid URI query string: [ " + queryString + " ]")
+  mapTokens.map(_.split("=")).map(kv => (kv(0), kv(1))).filter(_._1 == 
queryTag).map(_._2)
+}
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(queryString: String): String = {
+parseURLQueryParameter(queryString, "exclude")
+  .flatMap { excludeString =>
+val excludes: Array[String] = excludeString.split(",")
+assert(excludes.forall(_.split(":").length == 2),
+  "Invalid exclude string: expected 'org:module,org:module,..'," +
+" found [ " + excludeString + " ]")
+excludes
+  }.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(queryString: String): Boolean = {
+val transitive = parseURLQueryParameter(queryString, "transitive")
+if (transitive.isEmpty) {
+  false
+} else {
+  if (transitive.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")

Review comment:
   If so, what's the hive behaviour? it will throw an exception instead 
selecting the last one?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-29 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r532375495



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
   case "local" => "file:" + uri.getPath
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+DependencyUtils.resolveMavenDependencies(URI.create(path))
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys != null) {

Review comment:
   No chance for the "local"/"file" case to have commas in `keys`? 
https://github.com/apache/spark/pull/29966/files#r529246237

##
File path: 
core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
##
@@ -80,15 +80,10 @@ object DriverWrapper extends Logging {
 val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
 
 val Seq(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath) =
-  Seq(
-"spark.jars.excludes",
-"spark.jars.packages",
-"spark.jars.repositories",
-"spark.jars.ivy",
-"spark.jars.ivySettings"
-  ).map(sys.props.get(_).orNull)
+  DependencyUtils.getIvyProperties()
 
-val resolvedMavenCoordinates = 
DependencyUtils.resolveMavenDependencies(packagesExclusions,
+val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
+  true, packagesExclusions,
   packages, repositories, ivyRepoPath, Option(ivySettingsPath))

Review comment:
   nit:
   ```
   val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
 true, packagesExclusions, packages, repositories, ivyRepoPath, 
Option(ivySettingsPath))
   ```

##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -25,12 +25,104 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+

Review comment:
   nit: redundant blank line.

##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -25,12 +25,104 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+
+  private def parseURLQueryParameter(queryString: String, queryTag: String): 
Array[String] = {

Review comment:
   `parseURLQueryParameter` -> `parseURLQueryParameters` or 
`parseURLQueryParams`?

##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -25,12 +25,104 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+
+  private def parseURLQueryParameter(queryString: String, queryTag: String): 
Array[String] = {
+if (queryString == null || queryString.isEmpty) {
+  Array.empty[String]
+} else {
+  val mapTokens = queryString.split("&")
+  assert(mapTokens.forall(_.split("=").length == 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-24 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r530094153



##
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##
@@ -2980,6 +2980,75 @@ private[spark] object Utils extends Logging {
 metadata.toString
   }
 
+  /**
+   * Download Ivy URIs dependent jars.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+val Seq(repositories, ivyRepoPath, ivySettingsPath) =
+  Seq(
+"spark.jars.repositories",
+"spark.jars.ivy",
+"spark.jars.ivySettings"
+  ).map(sys.props.get(_).orNull)
+// Create the IvySettings, either load from file or build defaults
+val ivySettings = Option(ivySettingsPath) match {
+  case Some(path) =>
+SparkSubmitUtils.loadIvySettings(path, Option(repositories), 
Option(ivyRepoPath))

Review comment:
   > Maybe we can pull this out into a common utility that can be leveraged 
here and DriverWrapper? Then there is no need for testing twice.
   
   Yea, if we can, it looks better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-23 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r529239846



##
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##
@@ -2980,6 +2980,75 @@ private[spark] object Utils extends Logging {
 metadata.toString
   }
 
+  /**
+   * Download Ivy URIs dependent jars.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+val Seq(repositories, ivyRepoPath, ivySettingsPath) =
+  Seq(
+"spark.jars.repositories",
+"spark.jars.ivy",
+"spark.jars.ivySettings"
+  ).map(sys.props.get(_).orNull)
+// Create the IvySettings, either load from file or build defaults
+val ivySettings = Option(ivySettingsPath) match {
+  case Some(path) =>
+SparkSubmitUtils.loadIvySettings(path, Option(repositories), 
Option(ivyRepoPath))

Review comment:
   Could you add tests for this path?

##
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##
@@ -33,7 +33,7 @@ ADD JAR file_name
 
 * **file_name**
 
-The name of the JAR file to be added. It could be either on a local file 
system or a distributed file system.
+The name of the JAR file to be added. It could be either on a local file 
system or a distributed file system or a ivy URL.

Review comment:
   nit: `a` -> `an`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-23 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r529226242



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
   case "local" => "file:" + uri.getPath
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI>create(path)`

Review comment:
   nit: `>` -> `.`?

##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -955,6 +955,20 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 .set(EXECUTOR_ALLOW_SPARK_CONTEXT, true)).stop()
 }
   }
+
+  test("SPARK-33084: Add jar support ivy url") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+sc.addJar("ivy://org.scala-js:scalajs-test-interface_2.12:1.2.0")
+
assert(sc.listJars().find(_.contains("scalajs-test-interface_2.12")).nonEmpty)

Review comment:
   nit: `find` -> `exists`

##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
   case "local" => "file:" + uri.getPath
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI>create(path)`
+Utils.resolveMavenDependencies(URI.create(path))
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys != null) {
 val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-  logInfo(s"Added JAR $path at $key with timestamp $timestamp")
-  postEnvironmentUpdate()
-} else {
-  logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
-"is not supported in the current version.")
+keys.split(",").foreach { key =>

Review comment:
   Why do we need to split it by `,` here? It seems this PR adds no test 
for this case though.

##
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
 ADD JAR "/path/to/some.jar";
 ADD JAR '/some/other.jar';
 ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";

Review comment:
   `exclusin` -> `exclusion`

##
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
 ADD JAR "/path/to/some.jar";
 ADD JAR '/some/other.jar';
 ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";
+ADD JAR 
"ivy://group:module:version?exclusin=group:module,group:module=false";

Review comment:
   Could you move the description of the syntaxes into the section 
`**file_name**` above? And, please put some **concrete** examples in this 
example section.

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
##
@@ -159,6 +161,17 @@ class SessionResourceLoader(session: SparkSession) extends 
FunctionResourceLoade
 }
   }
 
+  def resolveJars(path: String): List[String] = {
+val uri = new Path(path).toUri
+uri.getScheme match {
+  case "ivy" =>
+Utils.resolveMavenDependencies(URI.create(path))
+  .split(",").toList
+  case _ =>
+path :: Nil

Review comment:
   nit format:
   ```
 def resolveJars(path: String): List[String] = {
   new Path(path).toUri.getScheme match {
 case "ivy" => 
Utils.resolveMavenDependencies(URI.create(path)).split(",").toList
 case _ => path :: Nil
   }
 }
   ```

##
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
 ADD JAR "/path/to/some.jar";
 ADD JAR '/some/other.jar';
 ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";
+ADD JAR 
"ivy://group:module:version?exclusin=group:module,group:module=false";

Review comment:
   Also, I think we need an explanation about each param, e.g., `exclusion` 
and