This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f4829b  [SPARK-28030][SQL] convert filePath to URI in binary file 
data source
4f4829b is described below

commit 4f4829b4ae261a9fd656fbf1928e6440d31f8d8c
Author: Xiangrui Meng <m...@databricks.com>
AuthorDate: Wed Jun 12 13:24:02 2019 -0700

    [SPARK-28030][SQL] convert filePath to URI in binary file data source
    
    ## What changes were proposed in this pull request?
    
    Convert `PartitionedFile.filePath` to URI first in binary file data source. 
Otherwise Spark will throw a FileNotFound exception because we create `Path` 
with URL encoded string, instead of wrapping it with URI.
    
    ## How was this patch tested?
    
    Unit test.
    
    Closes #24855 from mengxr/SPARK-28030.
    
    Authored-by: Xiangrui Meng <m...@databricks.com>
    Signed-off-by: Xiangrui Meng <m...@databricks.com>
---
 .../spark/sql/execution/datasources/FileScanRDD.scala      |  2 +-
 .../datasources/binaryfile/BinaryFileFormat.scala          |  3 ++-
 .../datasources/binaryfile/BinaryFileFormatSuite.scala     | 14 ++++++++++++++
 3 files changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index d92ea2e..9e98b0b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -38,7 +38,7 @@ import org.apache.spark.util.NextIterator
  * that need to be prepended to each row.
  *
  * @param partitionValues value of partition columns to be prepended to each 
row.
- * @param filePath path of the file to read
+ * @param filePath URI of the file to read
  * @param start the beginning offset (in bytes) of the block.
  * @param length number of bytes to read.
  * @param locations locality information (list of nodes that have the data).
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index cdc7cd5..fda4e14 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.binaryfile
 
+import java.net.URI
 import java.sql.Timestamp
 
 import com.google.common.io.{ByteStreams, Closeables}
@@ -100,7 +101,7 @@ class BinaryFileFormat extends FileFormat with 
DataSourceRegister {
     val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
 
     file: PartitionedFile => {
-      val path = new Path(file.filePath)
+      val path = new Path(new URI(file.filePath))
       val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
       val status = fs.getFileStatus(path)
       if (filterFuncs.forall(_.apply(status))) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 01dc96c..9e2969b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -368,4 +368,18 @@ class BinaryFileFormatSuite extends QueryTest with 
SharedSQLContext with SQLTest
       assert(caught.getMessage.contains("exceeds the max length allowed"))
     }
   }
+
+  test("SPARK-28030: support chars in file names that require URL encoding") {
+    withTempDir { dir =>
+      val file = new File(dir, "test space.txt")
+      val content = "123".getBytes
+      Files.write(file.toPath, content, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
+      val df = spark.read.format(BINARY_FILE).load(dir.getPath)
+      df.select(col(PATH), col(CONTENT)).first() match {
+        case Row(p: String, c: Array[Byte]) =>
+          assert(p.endsWith(file.getAbsolutePath), "should support space in 
file name")
+          assert(c === content, "should read file with space in file name")
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to