Github user sujith71955 commented on a diff in the pull request:
https://github.com/apache/spark/pull/20611#discussion_r202255230
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -303,94 +303,49 @@ case class LoadDataCommand(
s"partitioned, but a partition spec was provided.")
}
}
-
- val loadPath =
+ val loadPath = {
if (isLocal) {
- val uri = Utils.resolveURI(path)
- val file = new File(uri.getPath)
- val exists = if (file.getAbsolutePath.contains("*")) {
- val fileSystem = FileSystems.getDefault
- val dir = file.getParentFile.getAbsolutePath
- if (dir.contains("*")) {
- throw new AnalysisException(
- s"LOAD DATA input path allows only filename wildcard: $path")
- }
-
- // Note that special characters such as "*" on Windows are not
allowed as a path.
- // Calling `WindowsFileSystem.getPath` throws an exception if
there are in the path.
- val dirPath = fileSystem.getPath(dir)
- val pathPattern = new File(dirPath.toAbsolutePath.toString,
file.getName).toURI.getPath
- val safePathPattern = if (Utils.isWindows) {
- // On Windows, the pattern should not start with slashes for
absolute file paths.
- pathPattern.stripPrefix("/")
- } else {
- pathPattern
- }
- val files = new File(dir).listFiles()
- if (files == null) {
- false
- } else {
- val matcher = fileSystem.getPathMatcher("glob:" +
safePathPattern)
- files.exists(f =>
matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
- }
- } else {
- new File(file.getAbsolutePath).exists()
- }
- if (!exists) {
- throw new AnalysisException(s"LOAD DATA input path does not
exist: $path")
- }
- uri
+ val localFS = FileContext.getLocalFSFileContext()
+ localFS.makeQualified(new Path(path))
} else {
- val uri = new URI(path)
- val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() !=
null) {
- uri
+ val loadPath = new Path(path)
+ // Follow Hive's behavior:
+ // If no schema or authority is provided with non-local inpath,
+ // we will use hadoop configuration "fs.defaultFS".
+ val defaultFSConf =
sparkSession.sessionState.newHadoopConf().get("fs.defaultFS")
+ val defaultFS = if (defaultFSConf == null) {
+ new URI("")
} else {
- // Follow Hive's behavior:
- // If no schema or authority is provided with non-local inpath,
- // we will use hadoop configuration "fs.defaultFS".
- val defaultFSConf =
sparkSession.sessionState.newHadoopConf().get("fs.defaultFS")
- val defaultFS = if (defaultFSConf == null) {
- new URI("")
- } else {
- new URI(defaultFSConf)
- }
-
- val scheme = if (uri.getScheme() != null) {
- uri.getScheme()
- } else {
- defaultFS.getScheme()
- }
- val authority = if (uri.getAuthority() != null) {
- uri.getAuthority()
- } else {
- defaultFS.getAuthority()
- }
-
- if (scheme == null) {
- throw new AnalysisException(
- s"LOAD DATA: URI scheme is required for non-local input
paths: '$path'")
- }
-
- // Follow Hive's behavior:
- // If LOCAL is not specified, and the path is relative,
- // then the path is interpreted relative to "/user/<username>"
- val uriPath = uri.getPath()
- val absolutePath = if (uriPath != null &&
uriPath.startsWith("/")) {
- uriPath
- } else {
- s"/user/${System.getProperty("user.name")}/$uriPath"
- }
- new URI(scheme, authority, absolutePath, uri.getQuery(),
uri.getFragment())
- }
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
- val srcPath = new Path(hdfsUri)
- val fs = srcPath.getFileSystem(hadoopConf)
- if (!fs.exists(srcPath)) {
- throw new AnalysisException(s"LOAD DATA input path does not
exist: $path")
+ new URI(defaultFSConf)
}
- hdfsUri
- }
+ // Follow Hive's behavior:
+ // If LOCAL is not specified, and the path is relative,
+ // then the path is interpreted relative to "/user/<username>"
+ val uriPath = new
Path(s"/user/${System.getProperty("user.name")}/")
+ // makeQualified() will ignore the query parameter part while
creating a Path, so the
+ // entire path string will be considered while making a Path
instance,this is mainly done
+ // by considering the wild card scenario in mind.as per old logic
query param is
+ // been considered while creating uri instance and if path
contains wild card char '?'
+ // the remaining charecters after '?' will be removedwhile forming
URI instance
+ loadPath.makeQualified(defaultFS, uriPath)
+ }
+ }
+ val fs =
loadPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ // This handling is because while resolving the invalid urls starting
with file:///
+ // system throws IllegalArgumentException from globStatus api,so
inorder to handle
+ // such scenarios this code is added in try catch block and after
catching the
+ // run time exception a generic error will be displayed to the user.
+ try {
+ if (null == fs.globStatus(loadPath) ||
fs.globStatus(loadPath).isEmpty) {
+ throw new AnalysisException(s"LOAD DATA input path does not exist:
$path")
+ }
+ }
+ catch {
--- End diff --
fine
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]