Repository: spark
Updated Branches:
  refs/heads/master 0ce09ec54 -> b88ddb8a8


[SPARK-23425][SQL] Support wildcard in HDFS path for load table command

## What changes were proposed in this pull request?
**Problem statement**
load data command  with hdfs  file paths consists of  wild card strings like * 
are not working
eg:
"load data inpath 'hdfs://hacluster/user/ext*  into table t1"
throws Analysis exception while executing this query

![wildcard_issue](https://user-images.githubusercontent.com/12999161/42673744-9f5c0c16-8621-11e8-8d28-cdc41bbe6efe.PNG)

**Analysis -**
Currently fs.exists() API which is used for path validation in load command API 
cannot resolve the path with wild card pattern, To mitigate this problem i am 
using globStatus() API  another api  which can resolve the paths with hdfs 
supported wildcards like *,? etc(inline with hive wildcard support).

**Improvement identified as part of this issue -**
Currently system wont support wildcard character to be used for folder level 
path in a local file system.  This PR has handled this scenario, the same 
globStatus API will unify the validation logic of local and non local file 
systems, this will ensure the behavior consistency between the hdfs and local 
file path in  load command.

with this improvement user will be able to use a wildcard character in folder 
level path of a local file system in  load command inline with hive behaviour, 
in older versions user can use wildcards only in file path of the local file 
system if they use in folder path system use to give an error by mentioning 
that not supported.
eg: load data local  inpath '/localfilesystem/folder*  into table t1

## How was this patch tested?
a) Manually tested by executing test-cases in HDFS yarn cluster.  Reports is 
been attached in below section.
b) Existing test-case can verify the impact and functionality  for local file 
path scenarios
c) A test-case is been added for verifying the functionality when wild card is 
been used in folder level path of a local file system
## Test Results
Note: all ip's were updated to localhost for security reasons.
HDFS path details
```
vm1:/opt/ficlient # hadoop fs -ls /user/data/sujith1
Found 2 items
-rw-r--r--   3 shahid hadoop       4802 2018-03-26 15:45 
/user/data/sujith1/typeddata60.txt
-rw-r--r--   3 shahid hadoop       4883 2018-03-26 15:45 
/user/data/sujith1/typeddata61.txt
vm1:/opt/ficlient # hadoop fs -ls /user/data/sujith2
Found 2 items
-rw-r--r--   3 shahid hadoop       4802 2018-03-26 15:45 
/user/data/sujith2/typeddata60.txt
-rw-r--r--   3 shahid hadoop       4883 2018-03-26 15:45 
/user/data/sujith2/typeddata61.txt
```
positive scenario by specifying complete file path to know about record size
```
0: jdbc:hive2://localhost:22550/default> create table wild_spark (time 
timestamp, name string, isright boolean, datetoday date, num binary, height 
double, score float, decimaler decimal(10,0), id tinyint, age int, license 
bigint, length smallint) row format delimited fields terminated by ',';
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (1.217 seconds)
0: jdbc:hive2://localhost:22550/default> load data  inpath 
'/user/data/sujith1/typeddata60.txt' into table wild_spark;
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (4.236 seconds)
0: jdbc:hive2://localhost:22550/default> load data  inpath 
'/user/data/sujith1/typeddata61.txt' into table wild_spark;
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.602 seconds)
0: jdbc:hive2://localhost:22550/default> select count(*) from wild_spark;
+-----------+--+
| count(1)  |
+-----------+--+
| 121       |
+-----------+--+
1 row selected (18.529 seconds)
0: jdbc:hive2://localhost:22550/default>
```
With wild card character in file path
```
0: jdbc:hive2://localhost:22550/default> create table spark_withWildChar (time 
timestamp, name string, isright boolean, datetoday date, num binary, height 
double, score float, decimaler decimal(10,0), id tinyint, age int, license 
bigint, length smallint) row format delimited fields terminated by ',';
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.409 seconds)
0: jdbc:hive2://localhost:22550/default> load data  inpath 
'/user/data/sujith1/type*' into table spark_withWildChar;
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (1.502 seconds)
0: jdbc:hive2://localhost:22550/default> select count(*) from 
spark_withWildChar;
+-----------+--+
| count(1)  |
+-----------+--+
| 121       |
+-----------+--+
```
with ? wild card scenario
```
0: jdbc:hive2://localhost:22550/default> create table 
spark_withWildChar_DiffChar (time timestamp, name string, isright boolean, 
datetoday date, num binary, height double, score float, decimaler 
decimal(10,0), id tinyint, age int, license bigint, length smallint) row format 
delimited fields terminated by ',';
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.489 seconds)
0: jdbc:hive2://localhost:22550/default> load data  inpath 
'/user/data/sujith1/?ypeddata60.txt' into table spark_withWildChar_DiffChar;
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (1.152 seconds)
0: jdbc:hive2://localhost:22550/default> load data  inpath 
'/user/data/sujith1/?ypeddata61.txt' into table spark_withWildChar_DiffChar;
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.644 seconds)
0: jdbc:hive2://localhost:22550/default> select count(*) from 
spark_withWildChar_DiffChar;
+-----------+--+
| count(1)  |
+-----------+--+
| 121       |
+-----------+--+
1 row selected (16.078 seconds)
```
with  folder level wild card scenario
```
0: jdbc:hive2://localhost:22550/default> create table 
spark_withWildChar_folderlevel (time timestamp, name string, isright boolean, 
datetoday date, num binary, height double, score float, decimaler 
decimal(10,0), id tinyint, age int, license bigint, length smallint) row format 
delimited fields terminated by ',';
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.489 seconds)
0: jdbc:hive2://localhost:22550/default> load data  inpath '/user/data/suji*/*' 
into table spark_withWildChar_folderlevel;
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (1.152 seconds)

0: jdbc:hive2://localhost:22550/default> select count(*) from 
spark_withWildChar_folderlevel;
+-----------+--+
| count(1)  |
+-----------+--+
| 242       |
+-----------+--+
1 row selected (16.078 seconds)
```
Negative scenario invalid path
```
0: jdbc:hive2://localhost:22550/default> load data  inpath 
'/user/data/sujiinvalid*/*' into  table spark_withWildChar_folder;
Error: org.apache.spark.sql.AnalysisException: LOAD DATA input path does not 
exist: /user/data/sujiinvalid*/*; (state=,code=0)
0: jdbc:hive2://localhost:22550/default>
```
Hive Test results- file level
```
0: jdbc:hive2://localhost:21066/> create table hive_withWildChar_files (time 
timestamp, name string, isright boolean, datetoday date, num binary, height 
double, score float, decimaler decimal(10,0), id tinyint, age int, license 
bigint, length smallint) stored as TEXTFILE;
No rows affected (0.723 seconds)
0: jdbc:hive2://localhost:21066/> load data  inpath '/user/data/sujith1/type*'  
into  table hive_withWildChar_files;
INFO  : Loading data to table default.hive_withwildchar_files from 
hdfs://hacluster/user/sujith1/type*
No rows affected (0.682 seconds)
0: jdbc:hive2://localhost:21066/> select count(*) from hive_withWildChar_files;
+------+--+
| _c0  |
+------+--+
| 121  |
+------+--+
1 row selected (50.832 seconds)
```
Hive Test results- folder level
```
0: jdbc:hive2://localhost:21066/> create table hive_withWildChar_folder (time 
timestamp, name string, isright boolean, datetoday date, num binary, height 
double, score float, decimaler decimal(10,0), id tinyint, age int, license 
bigint, length smallint) stored as TEXTFILE;
No rows affected (0.459 seconds)
0: jdbc:hive2://localhost:21066/> load data  inpath '/user/data/suji*/*' into 
table hive_withWildChar_folder;
INFO  : Loading data to table default.hive_withwildchar_folder from 
hdfs://hacluster/user/data/suji*/*
No rows affected (0.76 seconds)
0: jdbc:hive2://localhost:21066/> select count(*) from hive_withWildChar_folder;
+------+--+
| _c0  |
+------+--+
| 242  |
+------+--+
1 row selected (46.483 seconds)
```

Closes #20611 from sujith71955/master_wldcardsupport.

Lead-authored-by: s71955 <sujithchacko.2...@gmail.com>
Co-authored-by: sujith71955 <sujithchacko.2...@gmail.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b88ddb8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b88ddb8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b88ddb8a

Branch: refs/heads/master
Commit: b88ddb8a83f88a68ac2ab45fc0b1bd8e8951d700
Parents: 0ce09ec
Author: s71955 <sujithchacko.2...@gmail.com>
Authored: Fri Aug 24 09:54:30 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Fri Aug 24 09:54:30 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/command/tables.scala    | 154 ++++++++-----------
 .../sql/hive/execution/SQLQuerySuite.scala      |  55 ++++++-
 2 files changed, 119 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b88ddb8a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index f4dede9..2eca1c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -18,14 +18,14 @@
 package org.apache.spark.sql.execution.command
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 import java.nio.file.FileSystems
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -303,94 +303,44 @@ case class LoadDataCommand(
           s"partitioned, but a partition spec was provided.")
       }
     }
-
-    val loadPath =
+    val loadPath = {
       if (isLocal) {
-        val uri = Utils.resolveURI(path)
-        val file = new File(uri.getPath)
-        val exists = if (file.getAbsolutePath.contains("*")) {
-          val fileSystem = FileSystems.getDefault
-          val dir = file.getParentFile.getAbsolutePath
-          if (dir.contains("*")) {
-            throw new AnalysisException(
-              s"LOAD DATA input path allows only filename wildcard: $path")
-          }
-
-          // Note that special characters such as "*" on Windows are not 
allowed as a path.
-          // Calling `WindowsFileSystem.getPath` throws an exception if there 
are in the path.
-          val dirPath = fileSystem.getPath(dir)
-          val pathPattern = new File(dirPath.toAbsolutePath.toString, 
file.getName).toURI.getPath
-          val safePathPattern = if (Utils.isWindows) {
-            // On Windows, the pattern should not start with slashes for 
absolute file paths.
-            pathPattern.stripPrefix("/")
-          } else {
-            pathPattern
-          }
-          val files = new File(dir).listFiles()
-          if (files == null) {
-            false
-          } else {
-            val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern)
-            files.exists(f => 
matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
-          }
-        } else {
-          new File(file.getAbsolutePath).exists()
-        }
-        if (!exists) {
-          throw new AnalysisException(s"LOAD DATA input path does not exist: 
$path")
-        }
-        uri
+        val localFS = FileContext.getLocalFSFileContext()
+        makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), 
new Path(path))
       } else {
-        val uri = new URI(path)
-        val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != 
null) {
-          uri
-        } else {
-          // Follow Hive's behavior:
-          // If no schema or authority is provided with non-local inpath,
-          // we will use hadoop configuration "fs.defaultFS".
-          val defaultFSConf = 
sparkSession.sessionState.newHadoopConf().get("fs.defaultFS")
-          val defaultFS = if (defaultFSConf == null) {
-            new URI("")
-          } else {
-            new URI(defaultFSConf)
-          }
-
-          val scheme = if (uri.getScheme() != null) {
-            uri.getScheme()
-          } else {
-            defaultFS.getScheme()
-          }
-          val authority = if (uri.getAuthority() != null) {
-            uri.getAuthority()
-          } else {
-            defaultFS.getAuthority()
-          }
-
-          if (scheme == null) {
-            throw new AnalysisException(
-              s"LOAD DATA: URI scheme is required for non-local input paths: 
'$path'")
-          }
-
-          // Follow Hive's behavior:
-          // If LOCAL is not specified, and the path is relative,
-          // then the path is interpreted relative to "/user/<username>"
-          val uriPath = uri.getPath()
-          val absolutePath = if (uriPath != null && uriPath.startsWith("/")) {
-            uriPath
-          } else {
-            s"/user/${System.getProperty("user.name")}/$uriPath"
-          }
-          new URI(scheme, authority, absolutePath, uri.getQuery(), 
uri.getFragment())
-        }
-        val hadoopConf = sparkSession.sessionState.newHadoopConf()
-        val srcPath = new Path(hdfsUri)
-        val fs = srcPath.getFileSystem(hadoopConf)
-        if (!fs.exists(srcPath)) {
-          throw new AnalysisException(s"LOAD DATA input path does not exist: 
$path")
-        }
-        hdfsUri
+        val loadPath = new Path(path)
+        // Follow Hive's behavior:
+        // If no schema or authority is provided with non-local inpath,
+        // we will use hadoop configuration "fs.defaultFS".
+        val defaultFSConf = 
sparkSession.sessionState.newHadoopConf().get("fs.defaultFS")
+        val defaultFS = if (defaultFSConf == null) new URI("") else new 
URI(defaultFSConf)
+        // Follow Hive's behavior:
+        // If LOCAL is not specified, and the path is relative,
+        // then the path is interpreted relative to "/user/<username>"
+        val uriPath = new Path(s"/user/${System.getProperty("user.name")}/")
+        // makeQualified() will ignore the query parameter part while creating 
a path, so the
+        // entire  string will be considered while making a Path instance,this 
is mainly done
+        // by considering the wild card scenario in mind.as per old logic 
query param  is
+        // been considered while creating URI instance and if path contains 
wild card char '?'
+        // the remaining charecters after '?' will be removed while forming 
URI instance
+        makeQualified(defaultFS, uriPath, loadPath)
       }
-
+    }
+    val fs = loadPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+    // This handling is because while resolving the invalid URLs starting with 
file:///
+    // system throws IllegalArgumentException from globStatus API,so in order 
to handle
+    // such scenarios this code is added in try catch block and after catching 
the
+    // runtime exception a generic error will be displayed to the user.
+    try {
+      val fileStatus = fs.globStatus(loadPath)
+      if (fileStatus == null || fileStatus.isEmpty) {
+        throw new AnalysisException(s"LOAD DATA input path does not exist: 
$path")
+      }
+    } catch {
+      case e: IllegalArgumentException =>
+        log.warn(s"Exception while validating the load path $path ", e)
+        throw new AnalysisException(s"LOAD DATA input path does not exist: 
$path")
+    }
     if (partition.nonEmpty) {
       catalog.loadPartition(
         targetTable.identifier,
@@ -413,6 +363,36 @@ case class LoadDataCommand(
     CommandUtils.updateTableStats(sparkSession, targetTable)
     Seq.empty[Row]
   }
+
+  /**
+   * Returns a qualified path object. Method ported from 
org.apache.hadoop.fs.Path class.
+   *
+   * @param defaultUri default uri corresponding to the filesystem provided.
+   * @param workingDir the working directory for the particular child path 
wd-relative names.
+   * @param path       Path instance based on the path string specified by the 
user.
+   * @return qualified path object
+   */
+  private def makeQualified(defaultUri: URI, workingDir: Path, path: Path): 
Path = {
+    val pathUri = if (path.isAbsolute()) path.toUri() else new 
Path(workingDir, path).toUri()
+    if (pathUri.getScheme == null || pathUri.getAuthority == null &&
+        defaultUri.getAuthority != null) {
+      val scheme = if (pathUri.getScheme == null) defaultUri.getScheme else 
pathUri.getScheme
+      val authority = if (pathUri.getAuthority == null) {
+        if (defaultUri.getAuthority == null) "" else defaultUri.getAuthority
+      } else {
+        pathUri.getAuthority
+      }
+      try {
+        val newUri = new URI(scheme, authority, pathUri.getPath, 
pathUri.getFragment)
+        new Path(newUri)
+      } catch {
+        case e: URISyntaxException =>
+          throw new IllegalArgumentException(e)
+      }
+    } else {
+      path
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b88ddb8a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 13aa2b8..20c4c36 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1912,11 +1912,60 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
           sql("LOAD DATA LOCAL INPATH '/non-exist-folder/*part*' INTO TABLE 
load_t")
         }.getMessage
         assert(m.contains("LOAD DATA input path does not exist"))
+      }
+    }
+  }
 
-        val m2 = intercept[AnalysisException] {
-          sql(s"LOAD DATA LOCAL INPATH '$path*/*part*' INTO TABLE load_t")
+  test("Support wildcard character in folderlevel for LOAD DATA LOCAL INPATH") 
{
+    withTempDir { dir =>
+      val path = dir.toURI.toString.stripSuffix("/")
+      val dirPath = dir.getAbsoluteFile
+      for (i <- 1 to 3) {
+        Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), 
StandardCharsets.UTF_8)
+      }
+      withTable("load_t_folder_wildcard") {
+        sql("CREATE TABLE load_t (a STRING)")
+        sql(s"LOAD DATA LOCAL INPATH '${
+          path.substring(0, path.length - 1)
+            .concat("*")
+        }/' INTO TABLE load_t")
+        checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), 
Row("3")))
+        val m = intercept[AnalysisException] {
+          sql(s"LOAD DATA LOCAL INPATH '${
+            path.substring(0, path.length - 1).concat("_invalid_dir") concat 
("*")
+          }/' INTO TABLE load_t")
         }.getMessage
-        assert(m2.contains("LOAD DATA input path allows only filename 
wildcard"))
+        assert(m.contains("LOAD DATA input path does not exist"))
+      }
+    }
+  }
+
+  test("SPARK-17796 Support wildcard '?'char in middle as part of local file 
path") {
+    withTempDir { dir =>
+      val path = dir.toURI.toString.stripSuffix("/")
+      val dirPath = dir.getAbsoluteFile
+      for (i <- 1 to 3) {
+        Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), 
StandardCharsets.UTF_8)
+      }
+      withTable("load_t1") {
+        sql("CREATE TABLE load_t1 (a STRING)")
+        sql(s"LOAD DATA LOCAL INPATH '$path/part-r-0000?' INTO TABLE load_t1")
+        checkAnswer(sql("SELECT * FROM load_t1"), Seq(Row("1"), Row("2"), 
Row("3")))
+      }
+    }
+  }
+
+  test("SPARK-17796 Support wildcard '?'char in start as part of local file 
path") {
+    withTempDir { dir =>
+      val path = dir.toURI.toString.stripSuffix("/")
+      val dirPath = dir.getAbsoluteFile
+      for (i <- 1 to 3) {
+        Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), 
StandardCharsets.UTF_8)
+      }
+      withTable("load_t2") {
+        sql("CREATE TABLE load_t2 (a STRING)")
+        sql(s"LOAD DATA LOCAL INPATH '$path/?art-r-00001' INTO TABLE load_t2")
+        checkAnswer(sql("SELECT * FROM load_t2"), Seq(Row("1")))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to