[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20853


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-21 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r176154914
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -657,6 +662,30 @@ class SparkSubmitSuite
 conf3.get(PYSPARK_PYTHON.key) should be ("python3.5")
   }
 
+  test("ambiguous archive mapping results in error message") {
+val dir = Utils.createTempDir()
+val archive1 = Paths.get(dir.toPath.toString, "first.zip")
+val archive2 = Paths.get(dir.toPath.toString, "second.zip")
+Files.createFile(archive1)
+Files.createFile(archive2)
+val jars = "/jar1,/jar2" // --jars
+val files = "local:/file1,file2"  // --files
+// --archives
--- End diff --

Unnecessary comment. I know the other test has them, but I'd just remove 
these from this new code, since they don't add any useful information.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-21 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r176152842
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +138,32 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val (base, fragment) = splitOnFragment(path)
+  (resolveGlobPath(base, hadoopConf), fragment) match {
+case (resolved, Some(_)) if resolved.length > 1 => throw new 
SparkException(
+s"${base.toString} resolves ambiguously to multiple files: 
${resolved.mkString(",")}")
+case (resolved, Some(namedAs)) => resolved.map( _ + "#" + namedAs)
+case (resolved, _) => resolved
   }
 }.mkString(",")
   }
 
+  private def splitOnFragment(path: String): (URI, Option[String]) = {
+val uri = Utils.resolveURI(path)
+val withoutFragment = new URI(uri.getScheme, 
uri.getSchemeSpecificPart, null)
+val fragment = if (uri.getFragment != null) Some(uri.getFragment) else 
None
--- End diff --

`Option(uri.getFragment)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-21 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r176155080
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -657,6 +662,30 @@ class SparkSubmitSuite
 conf3.get(PYSPARK_PYTHON.key) should be ("python3.5")
   }
 
+  test("ambiguous archive mapping results in error message") {
+val dir = Utils.createTempDir()
+val archive1 = Paths.get(dir.toPath.toString, "first.zip")
+val archive2 = Paths.get(dir.toPath.toString, "second.zip")
+Files.createFile(archive1)
+Files.createFile(archive2)
+val jars = "/jar1,/jar2" // --jars
+val files = "local:/file1,file2"  // --files
+// --archives
+val archives = 
s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+val pyFiles = "py-file1,py-file2"// --py-files
+
+// Test files and archives (Yarn)
--- End diff --

Unnecessary comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-20 Thread misutoth
Github user misutoth commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175664716
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +138,32 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val (base, fragment) = splitOnFragment(path)
+  (resolveGlobPath(base, hadoopConf), fragment) match {
+case (resolved, Some(_)) if resolved.length > 1 => throw new 
SparkException(
+s"${base.toString} resolves ambiguously to multiple files: 
${resolved.mkString(",")}")
--- End diff --

There was no space used here before. Actually there should not be any space 
in the resulting list. Tests also rely on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-20 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175662764
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -245,6 +245,17 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   args: SparkSubmitArguments,
   conf: Option[HadoopConfiguration] = None)
   : (Seq[String], Seq[String], SparkConf, String) = {
+try {
+  doPrepareSubmitEnvironment(args, conf)
+} catch {
+  case e: SparkException => printErrorAndExit(e.getMessage); throw e
--- End diff --

nit:
```
case e: SparkException =>
printErrorAndExit(e.getMessage)
throw e
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-20 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175662672
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +138,32 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val (base, fragment) = splitOnFragment(path)
+  (resolveGlobPath(base, hadoopConf), fragment) match {
+case (resolved, Some(_)) if resolved.length > 1 => throw new 
SparkException(
+s"${base.toString} resolves ambiguously to multiple files: 
${resolved.mkString(",")}")
+case (resolved, Some(namedAs)) => resolved.map( _ + "#" + namedAs)
+case (resolved, _) => resolved
   }
 }.mkString(",")
   }
 
+  private def splitOnFragment(path: String): (URI, Option[String]) = {
+val uri = Utils.resolveURI(path)
+val withoutFragment = new URI(uri.getScheme, 
uri.getSchemeSpecificPart, null)
+val fragment = if (uri.getFragment != null) Some(uri.getFragment) else 
None
+(withoutFragment, fragment)
+  }
+
+  private def resolveGlobPath(uri: URI, hadoopConf: Configuration): Array 
[String] = {
--- End diff --

nit: `Array[String]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-20 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175662574
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +138,32 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val (base, fragment) = splitOnFragment(path)
+  (resolveGlobPath(base, hadoopConf), fragment) match {
+case (resolved, Some(_)) if resolved.length > 1 => throw new 
SparkException(
+s"${base.toString} resolves ambiguously to multiple files: 
${resolved.mkString(",")}")
--- End diff --

nit: `resolved.mkString(", ")`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175585634
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +138,36 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val (base, fragment) = splitOnFragment(Utils.resolveURI(path))
+  (resolveGlobPath(base, hadoopConf), fragment) match {
+case (resolved: Array[String], Some(_)) if resolved.length > 1 => 
throw new SparkException(
+s"${base.toString} resolves ambiguously to multiple files: 
${resolved.mkString(",")}")
+case (resolved: Array[String], Some(namedAs)) => resolved.map( _ + 
"#" + namedAs)
--- End diff --

Same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175585581
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +138,36 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val (base, fragment) = splitOnFragment(Utils.resolveURI(path))
+  (resolveGlobPath(base, hadoopConf), fragment) match {
+case (resolved: Array[String], Some(_)) if resolved.length > 1 => 
throw new SparkException(
--- End diff --

Type inference is not working here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread misutoth
Github user misutoth commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175576249
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   args: SparkSubmitArguments,
   conf: Option[HadoopConfiguration] = None)
   : (Seq[String], Seq[String], SparkConf, String) = {
+try {
+  doPrepareSubmitEnvironment(args, conf)
+} catch {
+  case e: SparkException =>
+printErrorAndExit(e.getMessage)
+throw new RuntimeException("Unreachable production code")
--- End diff --

Actually the directory deletion is hooked into JVM shutdown. So I will let 
this to do the housekeeping for us and will avoid a new field either.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread misutoth
Github user misutoth commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175575718
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +137,29 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val spath = path.split('#')
--- End diff --

You are right. It took some time to clone a URI without the fragment part 
though but next version will include that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175567819
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   args: SparkSubmitArguments,
   conf: Option[HadoopConfiguration] = None)
   : (Seq[String], Seq[String], SparkConf, String) = {
+try {
+  doPrepareSubmitEnvironment(args, conf)
+} catch {
+  case e: SparkException =>
+printErrorAndExit(e.getMessage)
+throw new RuntimeException("Unreachable production code")
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175554046
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   args: SparkSubmitArguments,
   conf: Option[HadoopConfiguration] = None)
   : (Seq[String], Seq[String], SparkConf, String) = {
+try {
+  doPrepareSubmitEnvironment(args, conf)
+} catch {
+  case e: SparkException =>
+printErrorAndExit(e.getMessage)
+throw new RuntimeException("Unreachable production code")
--- End diff --

Maybe just let the exception propagate? That's what a lot of this code 
does... then you don't need to change this file at all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175552660
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +137,29 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val spath = path.split('#')
--- End diff --

Why not use `Utils.resolveURI` as before? Parsing URIs by hand is very 
sketchy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175553679
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +137,29 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val spath = path.split('#')
+  val renameAs = if (spath.length > 1) Some(spath(1)) else None
+  val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf)
+  resolved match {
--- End diff --

This whole match block is a little ugly, but I'll wait to see how you 
implement Gabor's suggestion...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175553185
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -105,11 +105,17 @@ class SparkSubmitSuite
 
   // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
   implicit val defaultSignaler: Signaler = ThreadSignaler
+  var dir: File = null
--- End diff --

Yeah, there is a good example here: `test("launch simple application with 
spark-submit with redaction")`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175552387
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -606,9 +612,12 @@ class SparkSubmitSuite
   }
 
   test("resolves command line argument paths correctly") {
+val archive = Paths.get(dir.toPath.toString, "single.zip")
+Files.createFile(archive)
 val jars = "/jar1,/jar2" // --jars
 val files = "local:/file1,file2"  // --files
-val archives = "file:/archive1,archive2" // --archives
+val archives = 
s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+ // --archives
 val pyFiles = "py-file1,py-file2"// --py-files
--- End diff --

YARN's `Client.scala` supports renaming for everything that uses the 
distributed cache, even if that's not explicitly called out in the docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread misutoth
Github user misutoth commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175546041
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -105,11 +105,17 @@ class SparkSubmitSuite
 
   // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
   implicit val defaultSignaler: Signaler = ThreadSignaler
+  var dir: File = null
--- End diff --

I wanted to make sure the directory is deleted even if the test fails


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175544492
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +137,29 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val spath = path.split('#')
+  val renameAs = if (spath.length > 1) Some(spath(1)) else None
+  val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf)
+  resolved match {
+case array: Array[String] if !renameAs.isEmpty && array.length>1 =>
+  throw new SparkException(
+s"${spath(1)} resolves ambiguously to multiple files: 
${array.mkString(",")}")
+case array: Array[String] if !renameAs.isEmpty => array.map( _ + 
"#" + renameAs.get)
--- End diff --

Maybe we can find some meaningful name for `array` which makes me hard to 
read the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread misutoth
Github user misutoth commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175543984
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   args: SparkSubmitArguments,
   conf: Option[HadoopConfiguration] = None)
   : (Seq[String], Seq[String], SparkConf, String) = {
+try {
+  doPrepareSubmitEnvironment(args, conf)
+} catch {
+  case e: SparkException =>
+printErrorAndExit(e.getMessage)
+throw new RuntimeException("Unreachable production code")
--- End diff --

Otherwise execution just continues in the test itself where `exitFn` does 
not stop it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175543124
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +137,29 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val spath = path.split('#')
+  val renameAs = if (spath.length > 1) Some(spath(1)) else None
+  val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf)
--- End diff --

Nit: `resolveGlobPath`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175541913
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   args: SparkSubmitArguments,
   conf: Option[HadoopConfiguration] = None)
   : (Seq[String], Seq[String], SparkConf, String) = {
+try {
+  doPrepareSubmitEnvironment(args, conf)
+} catch {
+  case e: SparkException =>
+printErrorAndExit(e.getMessage)
+throw new RuntimeException("Unreachable production code")
--- End diff --

`throw new RuntimeException...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread misutoth
Github user misutoth commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175541331
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   args: SparkSubmitArguments,
   conf: Option[HadoopConfiguration] = None)
   : (Seq[String], Seq[String], SparkConf, String) = {
+try {
+  doPrepareSubmitEnvironment(args, conf)
+} catch {
+  case e: SparkException =>
+printErrorAndExit(e.getMessage)
+throw new RuntimeException("Unreachable production code")
--- End diff --

Which part do you mean and overkill?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175540847
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -606,9 +612,12 @@ class SparkSubmitSuite
   }
 
   test("resolves command line argument paths correctly") {
+val archive = Paths.get(dir.toPath.toString, "single.zip")
+Files.createFile(archive)
 val jars = "/jar1,/jar2" // --jars
 val files = "local:/file1,file2"  // --files
-val archives = "file:/archive1,archive2" // --archives
+val archives = 
s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+ // --archives
 val pyFiles = "py-file1,py-file2"// --py-files
--- End diff --

OK, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175540696
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -105,11 +105,17 @@ class SparkSubmitSuite
 
   // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
   implicit val defaultSignaler: Signaler = ThreadSignaler
+  var dir: File = null
--- End diff --

I mean more like put something here which is used by more than 2 tests. 
There are ~40 tests which are just creating and deleting this directory without 
any benefit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread misutoth
Github user misutoth commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175540130
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -606,9 +612,12 @@ class SparkSubmitSuite
   }
 
   test("resolves command line argument paths correctly") {
+val archive = Paths.get(dir.toPath.toString, "single.zip")
+Files.createFile(archive)
 val jars = "/jar1,/jar2" // --jars
 val files = "local:/file1,file2"  // --files
-val archives = "file:/archive1,archive2" // --archives
+val archives = 
s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+ // --archives
 val pyFiles = "py-file1,py-file2"// --py-files
--- End diff --

According to the 
[doc](https://spark.apache.org/docs/latest/running-on-yarn.html) only `--files` 
and `--archives` support it. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread misutoth
Github user misutoth commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175539256
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -105,11 +105,17 @@ class SparkSubmitSuite
 
   // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
   implicit val defaultSignaler: Signaler = ThreadSignaler
+  var dir: File = null
--- End diff --

I was thinking about this too. I wanted to avoid making it an Option or 
doing a not very nice null check. I can do that later though...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175533621
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +137,29 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val spath = path.split('#')
+  val renameAs = if (spath.length > 1) Some(spath(1)) else None
+  val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf)
+  resolved match {
--- End diff --

This can be simplified something like this: `(renameAs, resolved) match...`



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175521372
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -606,9 +612,12 @@ class SparkSubmitSuite
   }
 
   test("resolves command line argument paths correctly") {
+val archive = Paths.get(dir.toPath.toString, "single.zip")
+Files.createFile(archive)
 val jars = "/jar1,/jar2" // --jars
 val files = "local:/file1,file2"  // --files
-val archives = "file:/archive1,archive2" // --archives
+val archives = 
s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+ // --archives
 val pyFiles = "py-file1,py-file2"// --py-files
--- End diff --

Does `--py-files` support renaming?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175515840
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -105,11 +105,17 @@ class SparkSubmitSuite
 
   // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
   implicit val defaultSignaler: Signaler = ThreadSignaler
+  var dir: File = null
--- End diff --

Most of the tests doesn't use this dir at all. Why create it for all the 
tests?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175529369
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
---
@@ -137,16 +137,29 @@ private[deploy] object DependencyUtils {
   def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = 
{
 require(paths != null, "paths cannot be null.")
 Utils.stringToSeq(paths).flatMap { path =>
-  val uri = Utils.resolveURI(path)
-  uri.getScheme match {
-case "local" | "http" | "https" | "ftp" => Array(path)
-case _ =>
-  val fs = FileSystem.get(uri, hadoopConf)
-  Option(fs.globStatus(new Path(uri))).map { status =>
-status.filter(_.isFile).map(_.getPath.toUri.toString)
-  }.getOrElse(Array(path))
+  val spath = path.split('#')
+  val renameAs = if (spath.length > 1) Some(spath(1)) else None
+  val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf)
+  resolved match {
+case array: Array[String] if !renameAs.isEmpty && array.length>1 =>
--- End diff --

Nit: `array.length > 1`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175521925
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -657,6 +667,31 @@ class SparkSubmitSuite
 conf3.get(PYSPARK_PYTHON.key) should be ("python3.5")
   }
 
+  var cleanExit = false
--- End diff --

What is it used for?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...

2018-03-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20853#discussion_r175523620
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   args: SparkSubmitArguments,
   conf: Option[HadoopConfiguration] = None)
   : (Seq[String], Seq[String], SparkConf, String) = {
+try {
+  doPrepareSubmitEnvironment(args, conf)
+} catch {
+  case e: SparkException =>
+printErrorAndExit(e.getMessage)
+throw new RuntimeException("Unreachable production code")
--- End diff --

Nit: I have a feeling it's a bit overkill compared to the other occurences.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org