Repository: spark Updated Branches: refs/heads/master 0ce09ec54 -> b88ddb8a8
[SPARK-23425][SQL] Support wildcard in HDFS path for load table command ## What changes were proposed in this pull request? **Problem statement** load data command with hdfs file paths consists of wild card strings like * are not working eg: "load data inpath 'hdfs://hacluster/user/ext* into table t1" throws Analysis exception while executing this query ![wildcard_issue](https://user-images.githubusercontent.com/12999161/42673744-9f5c0c16-8621-11e8-8d28-cdc41bbe6efe.PNG) **Analysis -** Currently fs.exists() API which is used for path validation in load command API cannot resolve the path with wild card pattern, To mitigate this problem i am using globStatus() API another api which can resolve the paths with hdfs supported wildcards like *,? etc(inline with hive wildcard support). **Improvement identified as part of this issue -** Currently system wont support wildcard character to be used for folder level path in a local file system. This PR has handled this scenario, the same globStatus API will unify the validation logic of local and non local file systems, this will ensure the behavior consistency between the hdfs and local file path in load command. with this improvement user will be able to use a wildcard character in folder level path of a local file system in load command inline with hive behaviour, in older versions user can use wildcards only in file path of the local file system if they use in folder path system use to give an error by mentioning that not supported. eg: load data local inpath '/localfilesystem/folder* into table t1 ## How was this patch tested? a) Manually tested by executing test-cases in HDFS yarn cluster. Reports is been attached in below section. b) Existing test-case can verify the impact and functionality for local file path scenarios c) A test-case is been added for verifying the functionality when wild card is been used in folder level path of a local file system ## Test Results Note: all ip's were updated to localhost for security reasons. HDFS path details ``` vm1:/opt/ficlient # hadoop fs -ls /user/data/sujith1 Found 2 items -rw-r--r-- 3 shahid hadoop 4802 2018-03-26 15:45 /user/data/sujith1/typeddata60.txt -rw-r--r-- 3 shahid hadoop 4883 2018-03-26 15:45 /user/data/sujith1/typeddata61.txt vm1:/opt/ficlient # hadoop fs -ls /user/data/sujith2 Found 2 items -rw-r--r-- 3 shahid hadoop 4802 2018-03-26 15:45 /user/data/sujith2/typeddata60.txt -rw-r--r-- 3 shahid hadoop 4883 2018-03-26 15:45 /user/data/sujith2/typeddata61.txt ``` positive scenario by specifying complete file path to know about record size ``` 0: jdbc:hive2://localhost:22550/default> create table wild_spark (time timestamp, name string, isright boolean, datetoday date, num binary, height double, score float, decimaler decimal(10,0), id tinyint, age int, license bigint, length smallint) row format delimited fields terminated by ','; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (1.217 seconds) 0: jdbc:hive2://localhost:22550/default> load data inpath '/user/data/sujith1/typeddata60.txt' into table wild_spark; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (4.236 seconds) 0: jdbc:hive2://localhost:22550/default> load data inpath '/user/data/sujith1/typeddata61.txt' into table wild_spark; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.602 seconds) 0: jdbc:hive2://localhost:22550/default> select count(*) from wild_spark; +-----------+--+ | count(1) | +-----------+--+ | 121 | +-----------+--+ 1 row selected (18.529 seconds) 0: jdbc:hive2://localhost:22550/default> ``` With wild card character in file path ``` 0: jdbc:hive2://localhost:22550/default> create table spark_withWildChar (time timestamp, name string, isright boolean, datetoday date, num binary, height double, score float, decimaler decimal(10,0), id tinyint, age int, license bigint, length smallint) row format delimited fields terminated by ','; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.409 seconds) 0: jdbc:hive2://localhost:22550/default> load data inpath '/user/data/sujith1/type*' into table spark_withWildChar; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (1.502 seconds) 0: jdbc:hive2://localhost:22550/default> select count(*) from spark_withWildChar; +-----------+--+ | count(1) | +-----------+--+ | 121 | +-----------+--+ ``` with ? wild card scenario ``` 0: jdbc:hive2://localhost:22550/default> create table spark_withWildChar_DiffChar (time timestamp, name string, isright boolean, datetoday date, num binary, height double, score float, decimaler decimal(10,0), id tinyint, age int, license bigint, length smallint) row format delimited fields terminated by ','; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.489 seconds) 0: jdbc:hive2://localhost:22550/default> load data inpath '/user/data/sujith1/?ypeddata60.txt' into table spark_withWildChar_DiffChar; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (1.152 seconds) 0: jdbc:hive2://localhost:22550/default> load data inpath '/user/data/sujith1/?ypeddata61.txt' into table spark_withWildChar_DiffChar; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.644 seconds) 0: jdbc:hive2://localhost:22550/default> select count(*) from spark_withWildChar_DiffChar; +-----------+--+ | count(1) | +-----------+--+ | 121 | +-----------+--+ 1 row selected (16.078 seconds) ``` with folder level wild card scenario ``` 0: jdbc:hive2://localhost:22550/default> create table spark_withWildChar_folderlevel (time timestamp, name string, isright boolean, datetoday date, num binary, height double, score float, decimaler decimal(10,0), id tinyint, age int, license bigint, length smallint) row format delimited fields terminated by ','; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.489 seconds) 0: jdbc:hive2://localhost:22550/default> load data inpath '/user/data/suji*/*' into table spark_withWildChar_folderlevel; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (1.152 seconds) 0: jdbc:hive2://localhost:22550/default> select count(*) from spark_withWildChar_folderlevel; +-----------+--+ | count(1) | +-----------+--+ | 242 | +-----------+--+ 1 row selected (16.078 seconds) ``` Negative scenario invalid path ``` 0: jdbc:hive2://localhost:22550/default> load data inpath '/user/data/sujiinvalid*/*' into table spark_withWildChar_folder; Error: org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: /user/data/sujiinvalid*/*; (state=,code=0) 0: jdbc:hive2://localhost:22550/default> ``` Hive Test results- file level ``` 0: jdbc:hive2://localhost:21066/> create table hive_withWildChar_files (time timestamp, name string, isright boolean, datetoday date, num binary, height double, score float, decimaler decimal(10,0), id tinyint, age int, license bigint, length smallint) stored as TEXTFILE; No rows affected (0.723 seconds) 0: jdbc:hive2://localhost:21066/> load data inpath '/user/data/sujith1/type*' into table hive_withWildChar_files; INFO : Loading data to table default.hive_withwildchar_files from hdfs://hacluster/user/sujith1/type* No rows affected (0.682 seconds) 0: jdbc:hive2://localhost:21066/> select count(*) from hive_withWildChar_files; +------+--+ | _c0 | +------+--+ | 121 | +------+--+ 1 row selected (50.832 seconds) ``` Hive Test results- folder level ``` 0: jdbc:hive2://localhost:21066/> create table hive_withWildChar_folder (time timestamp, name string, isright boolean, datetoday date, num binary, height double, score float, decimaler decimal(10,0), id tinyint, age int, license bigint, length smallint) stored as TEXTFILE; No rows affected (0.459 seconds) 0: jdbc:hive2://localhost:21066/> load data inpath '/user/data/suji*/*' into table hive_withWildChar_folder; INFO : Loading data to table default.hive_withwildchar_folder from hdfs://hacluster/user/data/suji*/* No rows affected (0.76 seconds) 0: jdbc:hive2://localhost:21066/> select count(*) from hive_withWildChar_folder; +------+--+ | _c0 | +------+--+ | 242 | +------+--+ 1 row selected (46.483 seconds) ``` Closes #20611 from sujith71955/master_wldcardsupport. Lead-authored-by: s71955 <sujithchacko.2...@gmail.com> Co-authored-by: sujith71955 <sujithchacko.2...@gmail.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b88ddb8a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b88ddb8a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b88ddb8a Branch: refs/heads/master Commit: b88ddb8a83f88a68ac2ab45fc0b1bd8e8951d700 Parents: 0ce09ec Author: s71955 <sujithchacko.2...@gmail.com> Authored: Fri Aug 24 09:54:30 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Fri Aug 24 09:54:30 2018 +0800 ---------------------------------------------------------------------- .../spark/sql/execution/command/tables.scala | 154 ++++++++----------- .../sql/hive/execution/SQLQuerySuite.scala | 55 ++++++- 2 files changed, 119 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b88ddb8a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index f4dede9..2eca1c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -18,14 +18,14 @@ package org.apache.spark.sql.execution.command import java.io.File -import java.net.URI +import java.net.{URI, URISyntaxException} import java.nio.file.FileSystems import scala.collection.mutable.ArrayBuffer import scala.util.Try import scala.util.control.NonFatal -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileContext, FsConstants, Path} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - - val loadPath = + val loadPath = { if (isLocal) { - val uri = Utils.resolveURI(path) - val file = new File(uri.getPath) - val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { - throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { - // On Windows, the pattern should not start with slashes for absolute file paths. - pathPattern.stripPrefix("/") - } else { - pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { - false - } else { - val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) - files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } - } else { - new File(file.getAbsolutePath).exists() - } - if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") - } - uri + val localFS = FileContext.getLocalFSFileContext() + makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { - val uri = new URI(path) - val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri - } else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { - new URI("") - } else { - new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { - uri.getScheme() - } else { - defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { - uri.getAuthority() - } else { - defaultFS.getAuthority() - } - - if (scheme == null) { - throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/<username>" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { - uriPath - } else { - s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) - } - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val srcPath = new Path(hdfsUri) - val fs = srcPath.getFileSystem(hadoopConf) - if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") - } - hdfsUri + val loadPath = new Path(path) + // Follow Hive's behavior: + // If no schema or authority is provided with non-local inpath, + // we will use hadoop configuration "fs.defaultFS". + val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") + val defaultFS = if (defaultFSConf == null) new URI("") else new URI(defaultFSConf) + // Follow Hive's behavior: + // If LOCAL is not specified, and the path is relative, + // then the path is interpreted relative to "/user/<username>" + val uriPath = new Path(s"/user/${System.getProperty("user.name")}/") + // makeQualified() will ignore the query parameter part while creating a path, so the + // entire string will be considered while making a Path instance,this is mainly done + // by considering the wild card scenario in mind.as per old logic query param is + // been considered while creating URI instance and if path contains wild card char '?' + // the remaining charecters after '?' will be removed while forming URI instance + makeQualified(defaultFS, uriPath, loadPath) } - + } + val fs = loadPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + // This handling is because while resolving the invalid URLs starting with file:/// + // system throws IllegalArgumentException from globStatus API,so in order to handle + // such scenarios this code is added in try catch block and after catching the + // runtime exception a generic error will be displayed to the user. + try { + val fileStatus = fs.globStatus(loadPath) + if (fileStatus == null || fileStatus.isEmpty) { + throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + } + } catch { + case e: IllegalArgumentException => + log.warn(s"Exception while validating the load path $path ", e) + throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + } if (partition.nonEmpty) { catalog.loadPartition( targetTable.identifier, @@ -413,6 +363,36 @@ case class LoadDataCommand( CommandUtils.updateTableStats(sparkSession, targetTable) Seq.empty[Row] } + + /** + * Returns a qualified path object. Method ported from org.apache.hadoop.fs.Path class. + * + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return qualified path object + */ + private def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { + val pathUri = if (path.isAbsolute()) path.toUri() else new Path(workingDir, path).toUri() + if (pathUri.getScheme == null || pathUri.getAuthority == null && + defaultUri.getAuthority != null) { + val scheme = if (pathUri.getScheme == null) defaultUri.getScheme else pathUri.getScheme + val authority = if (pathUri.getAuthority == null) { + if (defaultUri.getAuthority == null) "" else defaultUri.getAuthority + } else { + pathUri.getAuthority + } + try { + val newUri = new URI(scheme, authority, pathUri.getPath, pathUri.getFragment) + new Path(newUri) + } catch { + case e: URISyntaxException => + throw new IllegalArgumentException(e) + } + } else { + path + } + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/b88ddb8a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 13aa2b8..20c4c36 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1912,11 +1912,60 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("LOAD DATA LOCAL INPATH '/non-exist-folder/*part*' INTO TABLE load_t") }.getMessage assert(m.contains("LOAD DATA input path does not exist")) + } + } + } - val m2 = intercept[AnalysisException] { - sql(s"LOAD DATA LOCAL INPATH '$path*/*part*' INTO TABLE load_t") + test("Support wildcard character in folderlevel for LOAD DATA LOCAL INPATH") { + withTempDir { dir => + val path = dir.toURI.toString.stripSuffix("/") + val dirPath = dir.getAbsoluteFile + for (i <- 1 to 3) { + Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8) + } + withTable("load_t_folder_wildcard") { + sql("CREATE TABLE load_t (a STRING)") + sql(s"LOAD DATA LOCAL INPATH '${ + path.substring(0, path.length - 1) + .concat("*") + }/' INTO TABLE load_t") + checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), Row("3"))) + val m = intercept[AnalysisException] { + sql(s"LOAD DATA LOCAL INPATH '${ + path.substring(0, path.length - 1).concat("_invalid_dir") concat ("*") + }/' INTO TABLE load_t") }.getMessage - assert(m2.contains("LOAD DATA input path allows only filename wildcard")) + assert(m.contains("LOAD DATA input path does not exist")) + } + } + } + + test("SPARK-17796 Support wildcard '?'char in middle as part of local file path") { + withTempDir { dir => + val path = dir.toURI.toString.stripSuffix("/") + val dirPath = dir.getAbsoluteFile + for (i <- 1 to 3) { + Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8) + } + withTable("load_t1") { + sql("CREATE TABLE load_t1 (a STRING)") + sql(s"LOAD DATA LOCAL INPATH '$path/part-r-0000?' INTO TABLE load_t1") + checkAnswer(sql("SELECT * FROM load_t1"), Seq(Row("1"), Row("2"), Row("3"))) + } + } + } + + test("SPARK-17796 Support wildcard '?'char in start as part of local file path") { + withTempDir { dir => + val path = dir.toURI.toString.stripSuffix("/") + val dirPath = dir.getAbsoluteFile + for (i <- 1 to 3) { + Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8) + } + withTable("load_t2") { + sql("CREATE TABLE load_t2 (a STRING)") + sql(s"LOAD DATA LOCAL INPATH '$path/?art-r-00001' INTO TABLE load_t2") + checkAnswer(sql("SELECT * FROM load_t2"), Seq(Row("1"))) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org