[SPARK-19257][SQL] location for table/partition/database should be java.net.URI

## What changes were proposed in this pull request?

Currently we treat the location of table/partition/database as URI string.

It will be safer if we can make the type of location as java.net.URI.

In this PR, there are following classes changes:
**1. CatalogDatabase**
```
case class CatalogDatabase(
    name: String,
    description: String,
    locationUri: String,
    properties: Map[String, String])
--->
case class CatalogDatabase(
    name: String,
    description: String,
    locationUri: URI,
    properties: Map[String, String])
```
**2. CatalogStorageFormat**
```
case class CatalogStorageFormat(
    locationUri: Option[String],
    inputFormat: Option[String],
    outputFormat: Option[String],
    serde: Option[String],
    compressed: Boolean,
    properties: Map[String, String])
---->
case class CatalogStorageFormat(
    locationUri: Option[URI],
    inputFormat: Option[String],
    outputFormat: Option[String],
    serde: Option[String],
    compressed: Boolean,
    properties: Map[String, String])
```

Before and After this PR, it is transparent for user, there is no change that 
the user should concern. The `String` to `URI` just happened in SparkSQL 
internally.

Here list some operation related location:
**1. whitespace in the location**
   e.g.  `/a/b c/d`
   For both table location and partition location,
   After `CREATE TABLE  t... (PARTITIONED BY ...) LOCATION '/a/b c/d'` ,
   then `DESC EXTENDED t ` show the location is `/a/b c/d`,
   and the real path in the FileSystem also show `/a/b c/d`

**2. colon(:) in the location**
   e.g.  `/a/b:c/d`
   For both table location and partition location,
   when `CREATE TABLE  t... (PARTITIONED BY ...)  LOCATION '/a/b:c/d'` ,

  **In linux file system**
   `DESC EXTENDED t ` show the location is `/a/b:c/d`,
   and the real path in the FileSystem also show `/a/b:c/d`

  **in HDFS** throw exception:
  `java.lang.IllegalArgumentException: Pathname /a/b:c/d from 
hdfs://iZbp1151s8hbnnwriekxdeZ:9000/a/b:c/d is not a valid DFS filename.`

  **while** After `INSERT INTO TABLE t PARTITION(a="a:b") SELECT 1`
   then `DESC EXTENDED t ` show the location is `/xxx/a=a%3Ab`,
   and the real path in the FileSystem also show `/xxx/a=a%3Ab`

**3. percent sign(%) in the location**
   e.g.  `/a/b%c/d`
   For both table location and partition location,
   After `CREATE TABLE  t... (PARTITIONED BY ...) LOCATION '/a/b%c/d'` ,
   then `DESC EXTENDED t ` show the location is `/a/b%c/d`,
   and the real path in the FileSystem also show `/a/b%c/d`

**4. encoded(%25) in the location**
   e.g.  `/a/b%25c/d`
   For both table location and partition location,
   After `CREATE TABLE  t... (PARTITIONED BY ...)  LOCATION '/a/b%25c/d'` ,
   then `DESC EXTENDED t ` show the location is `/a/b%25c/d`,
   and the real path in the FileSystem also show `/a/b%25c/d`

   **while** After `INSERT INTO TABLE t PARTITION(a="%25") SELECT 1`
   then `DESC EXTENDED t ` show the location is `/xxx/a=%2525`,
   and the real path in the FileSystem also show `/xxx/a=%2525`

**Additionally**, except the location, there are two other factors will affect 
the location of the table/partition. one is the table name which does not 
allowed to have special characters, and the  other is `partition name` which 
have the same actions with `partition value`, and `partition name` with special 
character situation has add some testcase and resolve a bug in 
[PR](https://github.com/apache/spark/pull/17173)

### Summary:
After `CREATE TABLE  t... (PARTITIONED BY ...)  LOCATION path`,
the path which we get from `DESC TABLE` and `real path in FileSystem` are all 
the same with the `CREATE TABLE` command(different filesystem has different 
action that allow what kind of special character to create the path, e.g. HDFS 
does not allow colon, but linux filesystem allow it ).

`DataBase` also have the same logic with `CREATE TABLE`

while if the `partition value` has some special character like `%` `:` `#` etc, 
then we will get the path with encoded `partition value` like `/xxx/a=A%25B` 
from `DESC TABLE` and `real path in FileSystem`

In this PR, the core change code is using `new Path(str).toUri` and `new 
Path(uri).toString`
which transfrom `str to uri `or `uri to str`.
for example:
```
val str = '/a/b c/d'
val uri = new Path(str).toUri  --> '/a/b%20c/d'
val strFromUri = new Path(uri).toString -> '/a/b c/d'
```

when we restore table/partition from metastore, or get the location from 
`CREATE TABLE` command, we can use it as above to change string to uri `new 
Path(str).toUri `

## How was this patch tested?
unit test added.
The `current master branch` also `passed all the test cases` added in this PR 
by a litter change.
https://github.com/apache/spark/pull/17149/files#diff-b7094baa12601424a5d19cb930e3402fR1764
here `toURI` -> `toString` when test in master branch.

This can show that this PR  is transparent for user.

Author: windpiger <song...@outlook.com>

Closes #17149 from windpiger/changeStringToURI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/096df6d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/096df6d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/096df6d9

Branch: refs/heads/master
Commit: 096df6d933c5326e5782aa8c5de842a0800eb369
Parents: 46a64d1
Author: windpiger <song...@outlook.com>
Authored: Mon Mar 6 10:44:26 2017 -0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Mar 6 10:44:26 2017 -0800

----------------------------------------------------------------------
 .../catalyst/catalog/ExternalCatalogUtils.scala |  26 +++
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  12 +-
 .../sql/catalyst/catalog/SessionCatalog.scala   |  15 +-
 .../spark/sql/catalyst/catalog/interface.scala  |  14 +-
 .../catalyst/catalog/ExternalCatalogSuite.scala |  18 +-
 .../spark/sql/execution/SparkSqlParser.scala    |   8 +-
 .../command/createDataSourceTables.scala        |  10 +-
 .../spark/sql/execution/command/ddl.scala       |  14 +-
 .../spark/sql/execution/command/tables.scala    |  11 +-
 .../datasources/CatalogFileIndex.scala          |   4 +-
 .../sql/execution/datasources/DataSource.scala  |   5 +-
 .../datasources/DataSourceStrategy.scala        |   6 +-
 .../apache/spark/sql/internal/CatalogImpl.scala |   4 +-
 .../apache/spark/sql/internal/SharedState.scala |   6 +-
 .../sql/execution/command/DDLCommandSuite.scala |   8 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 136 +++++++++++----
 .../spark/sql/internal/CatalogSuite.scala       |   4 +-
 .../spark/sql/sources/BucketedWriteSuite.scala  |   2 +-
 .../spark/sql/sources/PathOptionSuite.scala     |  12 +-
 .../spark/sql/hive/HiveExternalCatalog.scala    |  21 +--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   4 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |   2 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  15 +-
 .../apache/spark/sql/hive/client/HiveShim.scala |   9 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |  12 +-
 ...ernalCatalogBackwardCompatibilitySuite.scala |  23 ++-
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |   4 +-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  12 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   2 +-
 .../spark/sql/hive/MultiDatabaseSuite.scala     |   8 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |  13 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 171 ++++++++++++++++++-
 .../sql/hive/execution/SQLQuerySuite.scala      |   4 +-
 33 files changed, 460 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 58ced54..a418edc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import java.net.URI
+
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.util.Shell
 
@@ -162,6 +164,30 @@ object CatalogUtils {
     BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols)
   }
 
+  /**
+   * Convert URI to String.
+   * Since URI.toString does not decode the uri, e.g. change '%25' to '%'.
+   * Here we create a hadoop Path with the given URI, and rely on Path.toString
+   * to decode the uri
+   * @param uri the URI of the path
+   * @return the String of the path
+   */
+  def URIToString(uri: URI): String = {
+    new Path(uri).toString
+  }
+
+  /**
+   * Convert String to URI.
+   * Since new URI(string) does not encode string, e.g. change '%' to '%25'.
+   * Here we create a hadoop Path with the given String, and rely on Path.toUri
+   * to encode the string
+   * @param str the String of the path
+   * @return the URI of the path
+   */
+  def stringToURI(str: String): URI = {
+    new Path(str).toUri
+  }
+
   private def normalizeColumnName(
       tableName: String,
       tableCols: Seq[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 340e845..80aba4a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -202,7 +202,7 @@ class InMemoryCatalog(
           tableDefinition.storage.locationUri.isEmpty
 
       val tableWithLocation = if (needDefaultTableLocation) {
-        val defaultTableLocation = new Path(catalog(db).db.locationUri, table)
+        val defaultTableLocation = new Path(new 
Path(catalog(db).db.locationUri), table)
         try {
           val fs = defaultTableLocation.getFileSystem(hadoopConfig)
           fs.mkdirs(defaultTableLocation)
@@ -211,7 +211,7 @@ class InMemoryCatalog(
             throw new SparkException(s"Unable to create table $table as failed 
" +
               s"to create its directory $defaultTableLocation", e)
         }
-        tableDefinition.withNewStorage(locationUri = 
Some(defaultTableLocation.toUri.toString))
+        tableDefinition.withNewStorage(locationUri = 
Some(defaultTableLocation.toUri))
       } else {
         tableDefinition
       }
@@ -274,7 +274,7 @@ class InMemoryCatalog(
         "Managed table should always have table location, as we will assign a 
default location " +
           "to it if it doesn't have one.")
       val oldDir = new Path(oldDesc.table.location)
-      val newDir = new Path(catalog(db).db.locationUri, newName)
+      val newDir = new Path(new Path(catalog(db).db.locationUri), newName)
       try {
         val fs = oldDir.getFileSystem(hadoopConfig)
         fs.rename(oldDir, newDir)
@@ -283,7 +283,7 @@ class InMemoryCatalog(
           throw new SparkException(s"Unable to rename table $oldName to 
$newName as failed " +
             s"to rename its directory $oldDir", e)
       }
-      oldDesc.table = oldDesc.table.withNewStorage(locationUri = 
Some(newDir.toUri.toString))
+      oldDesc.table = oldDesc.table.withNewStorage(locationUri = 
Some(newDir.toUri))
     }
 
     catalog(db).tables.put(newName, oldDesc)
@@ -389,7 +389,7 @@ class InMemoryCatalog(
 
       existingParts.put(
         p.spec,
-        p.copy(storage = p.storage.copy(locationUri = 
Some(partitionPath.toString))))
+        p.copy(storage = p.storage.copy(locationUri = 
Some(partitionPath.toUri))))
     }
   }
 
@@ -462,7 +462,7 @@ class InMemoryCatalog(
         }
         oldPartition.copy(
           spec = newSpec,
-          storage = oldPartition.storage.copy(locationUri = 
Some(newPartPath.toString)))
+          storage = oldPartition.storage.copy(locationUri = 
Some(newPartPath.toUri)))
       } else {
         oldPartition.copy(spec = newSpec)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index f6412e4..498bfbd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import java.net.URI
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
@@ -131,10 +132,10 @@ class SessionCatalog(
    * does not contain a scheme, this path will not be changed after the default
    * FileSystem is changed.
    */
-  private def makeQualifiedPath(path: String): Path = {
+  private def makeQualifiedPath(path: URI): URI = {
     val hadoopPath = new Path(path)
     val fs = hadoopPath.getFileSystem(hadoopConf)
-    fs.makeQualified(hadoopPath)
+    fs.makeQualified(hadoopPath).toUri
   }
 
   private def requireDbExists(db: String): Unit = {
@@ -170,7 +171,7 @@ class SessionCatalog(
           "you cannot create a database with this name.")
     }
     validateName(dbName)
-    val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
+    val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri)
     externalCatalog.createDatabase(
       dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
       ignoreIfExists)
@@ -228,9 +229,9 @@ class SessionCatalog(
    * Get the path for creating a non-default database when database location 
is not provided
    * by users.
    */
-  def getDefaultDBPath(db: String): String = {
+  def getDefaultDBPath(db: String): URI = {
     val database = formatDatabaseName(db)
-    new Path(new Path(conf.warehousePath), database + ".db").toString
+    new Path(new Path(conf.warehousePath), database + ".db").toUri
   }
 
   // 
----------------------------------------------------------------------------
@@ -351,11 +352,11 @@ class SessionCatalog(
       db, table, loadPath, spec, isOverwrite, inheritTableSpecs, isSrcLocal)
   }
 
-  def defaultTablePath(tableIdent: TableIdentifier): String = {
+  def defaultTablePath(tableIdent: TableIdentifier): URI = {
     val dbName = 
formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase))
     val dbLocation = getDatabaseMetadata(dbName).locationUri
 
-    new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
+    new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toUri
   }
 
   // ----------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 887caf0..4452c47 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import java.net.URI
 import java.util.Date
 
 import com.google.common.base.Objects
@@ -48,10 +49,7 @@ case class CatalogFunction(
  * Storage format, used to describe how a partition or a table is stored.
  */
 case class CatalogStorageFormat(
-    // TODO(ekl) consider storing this field as java.net.URI for type safety. 
Note that this must
-    // be converted to/from a hadoop Path object using new Path(new 
URI(locationUri)) and
-    // path.toUri respectively before use as a filesystem path due to URI char 
escaping.
-    locationUri: Option[String],
+    locationUri: Option[URI],
     inputFormat: Option[String],
     outputFormat: Option[String],
     serde: Option[String],
@@ -105,7 +103,7 @@ case class CatalogTablePartition(
   }
 
   /** Return the partition location, assuming it is specified. */
-  def location: String = storage.locationUri.getOrElse {
+  def location: URI = storage.locationUri.getOrElse {
     val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
     throw new AnalysisException(s"Partition [$specString] did not specify 
locationUri")
   }
@@ -210,7 +208,7 @@ case class CatalogTable(
   }
 
   /** Return the table location, assuming it is specified. */
-  def location: String = storage.locationUri.getOrElse {
+  def location: URI = storage.locationUri.getOrElse {
     throw new AnalysisException(s"table $identifier did not specify 
locationUri")
   }
 
@@ -241,7 +239,7 @@ case class CatalogTable(
 
   /** Syntactic sugar to update a field in `storage`. */
   def withNewStorage(
-      locationUri: Option[String] = storage.locationUri,
+      locationUri: Option[URI] = storage.locationUri,
       inputFormat: Option[String] = storage.inputFormat,
       outputFormat: Option[String] = storage.outputFormat,
       compressed: Boolean = false,
@@ -337,7 +335,7 @@ object CatalogTableType {
 case class CatalogDatabase(
     name: String,
     description: String,
-    locationUri: String,
+    locationUri: URI,
     properties: Map[String, String])
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index a5d399a..07ccd68 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import java.net.URI
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterEach
@@ -340,7 +342,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
       "db1",
       "tbl",
       Map("partCol1" -> "1", "partCol2" -> "2")).location
-    val tableLocation = catalog.getTable("db1", "tbl").location
+    val tableLocation = new Path(catalog.getTable("db1", "tbl").location)
     val defaultPartitionLocation = new Path(new Path(tableLocation, 
"partCol1=1"), "partCol2=2")
     assert(new Path(partitionLocation) == defaultPartitionLocation)
   }
@@ -508,7 +510,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
       partitionColumnNames = Seq("partCol1", "partCol2"))
     catalog.createTable(table, ignoreIfExists = false)
 
-    val tableLocation = catalog.getTable("db1", "tbl").location
+    val tableLocation = new Path(catalog.getTable("db1", "tbl").location)
 
     val mixedCasePart1 = CatalogTablePartition(
       Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
@@ -699,7 +701,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
   // File System operations
   // --------------------------------------------------------------------------
 
-  private def exists(uri: String, children: String*): Boolean = {
+  private def exists(uri: URI, children: String*): Boolean = {
     val base = new Path(uri)
     val finalPath = children.foldLeft(base) {
       case (parent, child) => new Path(parent, child)
@@ -742,7 +744,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
       identifier = TableIdentifier("external_table", Some("db1")),
       tableType = CatalogTableType.EXTERNAL,
       storage = CatalogStorageFormat(
-        Some(Utils.createTempDir().getAbsolutePath),
+        Some(Utils.createTempDir().toURI),
         None, None, None, false, Map.empty),
       schema = new StructType().add("a", "int").add("b", "string"),
       provider = Some(defaultProvider)
@@ -790,7 +792,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     val partWithExistingDir = CatalogTablePartition(
       Map("partCol1" -> "7", "partCol2" -> "8"),
       CatalogStorageFormat(
-        Some(tempPath.toURI.toString),
+        Some(tempPath.toURI),
         None, None, None, false, Map.empty))
     catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), 
ignoreIfExists = false)
 
@@ -799,7 +801,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     val partWithNonExistingDir = CatalogTablePartition(
       Map("partCol1" -> "9", "partCol2" -> "10"),
       CatalogStorageFormat(
-        Some(tempPath.toURI.toString),
+        Some(tempPath.toURI),
         None, None, None, false, Map.empty))
     catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), 
ignoreIfExists = false)
     assert(tempPath.exists())
@@ -883,7 +885,7 @@ abstract class CatalogTestUtils {
 
   def newFunc(): CatalogFunction = newFunc("funcName")
 
-  def newUriForDatabase(): String = 
Utils.createTempDir().toURI.toString.stripSuffix("/")
+  def newUriForDatabase(): URI = new 
URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
 
   def newDb(name: String): CatalogDatabase = {
     CatalogDatabase(name, name + " description", newUriForDatabase(), 
Map.empty)
@@ -895,7 +897,7 @@ abstract class CatalogTestUtils {
     CatalogTable(
       identifier = TableIdentifier(name, database),
       tableType = CatalogTableType.EXTERNAL,
-      storage = storageFormat.copy(locationUri = 
Some(Utils.createTempDir().getAbsolutePath)),
+      storage = storageFormat.copy(locationUri = 
Some(Utils.createTempDir().toURI)),
       schema = new StructType()
         .add("col1", "int")
         .add("col2", "string")

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 65df688..c106163 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -386,7 +386,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         "LOCATION and 'path' in OPTIONS are both used to indicate the custom 
table path, " +
           "you can only specify one of them.", ctx)
     }
-    val customLocation = storage.locationUri.orElse(location)
+    val customLocation = 
storage.locationUri.orElse(location.map(CatalogUtils.stringToURI(_)))
 
     val tableType = if (customLocation.isDefined) {
       CatalogTableType.EXTERNAL
@@ -1080,8 +1080,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
     if (external && location.isEmpty) {
       operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by 
LOCATION", ctx)
     }
+
+    val locUri = location.map(CatalogUtils.stringToURI(_))
     val storage = CatalogStorageFormat(
-      locationUri = location,
+      locationUri = locUri,
       inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
       outputFormat = 
fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
       serde = 
rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
@@ -1132,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
           // At here, both rowStorage.serdeProperties and 
fileStorage.serdeProperties
           // are empty Maps.
           val newTableDesc = tableDesc.copy(
-            storage = CatalogStorageFormat.empty.copy(locationUri = location),
+            storage = CatalogStorageFormat.empty.copy(locationUri = locUri),
             provider = Some(conf.defaultDataSourceName))
           CreateTable(newTableDesc, mode, Some(q))
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index d835b52..3da66af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql.execution.command
 
+import java.net.URI
+
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -54,7 +58,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, 
ignoreIfExists: Boo
 
     // Create the relation to validate the arguments before writing the 
metadata to the metastore,
     // and infer the table schema and partition if users didn't specify schema 
in CREATE TABLE.
-    val pathOption = table.storage.locationUri.map("path" -> _)
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
     // Fill in some default table options from the session conf
     val tableWithDefaultOptions = table.copy(
       identifier = table.identifier.copy(
@@ -175,12 +179,12 @@ case class CreateDataSourceTableAsSelectCommand(
   private def saveDataIntoTable(
       session: SparkSession,
       table: CatalogTable,
-      tableLocation: Option[String],
+      tableLocation: Option[URI],
       data: LogicalPlan,
       mode: SaveMode,
       tableExists: Boolean): BaseRelation = {
     // Create the relation based on the input logical plan: `data`.
-    val pathOption = tableLocation.map("path" -> _)
+    val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
     val dataSource = DataSource(
       session,
       className = table.provider.get,

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 82cbb4a..b5c6042 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -66,7 +66,7 @@ case class CreateDatabaseCommand(
       CatalogDatabase(
         databaseName,
         comment.getOrElse(""),
-        path.getOrElse(catalog.getDefaultDBPath(databaseName)),
+        
path.map(CatalogUtils.stringToURI(_)).getOrElse(catalog.getDefaultDBPath(databaseName)),
         props),
       ifNotExists)
     Seq.empty[Row]
@@ -146,7 +146,7 @@ case class DescribeDatabaseCommand(
     val result =
       Row("Database Name", dbMetadata.name) ::
         Row("Description", dbMetadata.description) ::
-        Row("Location", dbMetadata.locationUri) :: Nil
+        Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri)) :: 
Nil
 
     if (extended) {
       val properties =
@@ -426,7 +426,8 @@ case class AlterTableAddPartitionCommand(
         table.identifier.quotedString,
         sparkSession.sessionState.conf.resolver)
       // inherit table storage format (possibly except for location)
-      CatalogTablePartition(normalizedSpec, table.storage.copy(locationUri = 
location))
+      CatalogTablePartition(normalizedSpec, table.storage.copy(
+        locationUri = location.map(CatalogUtils.stringToURI(_))))
     }
     catalog.createPartitions(table.identifier, parts, ignoreIfExists = 
ifNotExists)
     Seq.empty[Row]
@@ -710,7 +711,7 @@ case class AlterTableRecoverPartitionsCommand(
         // inherit table storage format (possibly except for location)
         CatalogTablePartition(
           spec,
-          table.storage.copy(locationUri = Some(location.toUri.toString)),
+          table.storage.copy(locationUri = Some(location.toUri)),
           params)
       }
       spark.sessionState.catalog.createPartitions(tableName, parts, 
ignoreIfExists = true)
@@ -741,6 +742,7 @@ case class AlterTableSetLocationCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
+    val locUri = CatalogUtils.stringToURI(location)
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
     partitionSpec match {
       case Some(spec) =>
@@ -748,11 +750,11 @@ case class AlterTableSetLocationCommand(
           sparkSession, table, "ALTER TABLE ... SET LOCATION")
         // Partition spec is specified, so we set the location only for this 
partition
         val part = catalog.getPartition(table.identifier, spec)
-        val newPart = part.copy(storage = part.storage.copy(locationUri = 
Some(location)))
+        val newPart = part.copy(storage = part.storage.copy(locationUri = 
Some(locUri)))
         catalog.alterPartitions(table.identifier, Seq(newPart))
       case None =>
         // No partition spec is specified, so we set the location for the 
table itself
-        catalog.alterTable(table.withNewStorage(locationUri = Some(location)))
+        catalog.alterTable(table.withNewStorage(locationUri = Some(locUri)))
     }
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 3e80916..86394ff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -79,7 +79,8 @@ case class CreateTableLikeCommand(
       CatalogTable(
         identifier = targetTable,
         tableType = tblType,
-        storage = sourceTableDesc.storage.copy(locationUri = location),
+        storage = sourceTableDesc.storage.copy(
+          locationUri = location.map(CatalogUtils.stringToURI(_))),
         schema = sourceTableDesc.schema,
         provider = newProvider,
         partitionColumnNames = sourceTableDesc.partitionColumnNames,
@@ -495,7 +496,8 @@ case class DescribeTableCommand(
     append(buffer, "Owner:", table.owner, "")
     append(buffer, "Create Time:", new Date(table.createTime).toString, "")
     append(buffer, "Last Access Time:", new 
Date(table.lastAccessTime).toString, "")
-    append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "")
+    append(buffer, "Location:", 
table.storage.locationUri.map(CatalogUtils.URIToString(_))
+      .getOrElse(""), "")
     append(buffer, "Table Type:", table.tableType.name, "")
     table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, ""))
 
@@ -587,7 +589,8 @@ case class DescribeTableCommand(
     append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", 
")}]", "")
     append(buffer, "Database:", table.database, "")
     append(buffer, "Table:", tableIdentifier.table, "")
-    append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), 
"")
+    append(buffer, "Location:", 
partition.storage.locationUri.map(CatalogUtils.URIToString(_))
+      .getOrElse(""), "")
     append(buffer, "Partition Parameters:", "", "")
     partition.parameters.foreach { case (key, value) =>
       append(buffer, s"  $key", value, "")
@@ -953,7 +956,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) 
extends RunnableComman
         // when the table creation DDL contains the PATH option.
         None
       } else {
-        Some(s"path '${escapeSingleQuotedString(location)}'")
+        Some(s"path 
'${escapeSingleQuotedString(CatalogUtils.URIToString(location))}'")
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 2068811..d6c4b97 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.net.URI
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
@@ -46,7 +48,7 @@ class CatalogFileIndex(
   assert(table.identifier.database.isDefined,
     "The table identifier must be qualified in CatalogFileIndex")
 
-  private val baseLocation: Option[String] = table.storage.locationUri
+  private val baseLocation: Option[URI] = table.storage.locationUri
 
   override def partitionSchema: StructType = table.partitionSchema
 

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 4947dfd..c9384e4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@@ -597,6 +597,7 @@ object DataSource {
   def buildStorageFormatFromOptions(options: Map[String, String]): 
CatalogStorageFormat = {
     val path = CaseInsensitiveMap(options).get("path")
     val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
-    CatalogStorageFormat.empty.copy(locationUri = path, properties = 
optionsWithoutPath)
+    CatalogStorageFormat.empty.copy(
+      locationUri = path.map(CatalogUtils.stringToURI(_)), properties = 
optionsWithoutPath)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index f694a0d..bddf5af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -21,13 +21,15 @@ import java.util.concurrent.Callable
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, 
InternalRow, QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -220,7 +222,7 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 
     val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
       override def call(): LogicalPlan = {
-        val pathOption = table.storage.locationUri.map("path" -> _)
+        val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
         val dataSource =
           DataSource(
             sparkSession,

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 3d9f418..ed07ff3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.internal
 
 import scala.reflect.runtime.universe.TypeTag
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, 
Table}
@@ -77,7 +79,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog 
{
     new Database(
       name = metadata.name,
       description = metadata.description,
-      locationUri = metadata.locationUri)
+      locationUri = CatalogUtils.URIToString(metadata.locationUri))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index bce84de..86129fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.Logging
@@ -95,7 +96,10 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
   // Create the default database if it doesn't exist.
   {
     val defaultDbDefinition = CatalogDatabase(
-      SessionCatalog.DEFAULT_DATABASE, "default database", warehousePath, 
Map())
+      SessionCatalog.DEFAULT_DATABASE,
+      "default database",
+      CatalogUtils.stringToURI(warehousePath),
+      Map())
     // Initialize default database if it doesn't exist
     if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
       // There may be another Spark application creating default database at 
the same time, here we

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 76bb9e5..4b73b07 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.command
 
+import java.net.URI
+
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -317,7 +319,7 @@ class DDLCommandSuite extends PlanTest {
     val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
     val ct = parseAs[CreateTable](query)
     assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
-    assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
+    assert(ct.tableDesc.storage.locationUri == Some(new 
URI("/something/anything")))
   }
 
   test("create hive table - property values must be set") {
@@ -334,7 +336,7 @@ class DDLCommandSuite extends PlanTest {
     val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
     val ct = parseAs[CreateTable](query)
     assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
-    assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
+    assert(ct.tableDesc.storage.locationUri == Some(new 
URI("/something/anything")))
   }
 
   test("create table - with partitioned by") {
@@ -409,7 +411,7 @@ class DDLCommandSuite extends PlanTest {
     val expectedTableDesc = CatalogTable(
       identifier = TableIdentifier("my_tab"),
       tableType = CatalogTableType.EXTERNAL,
-      storage = CatalogStorageFormat.empty.copy(locationUri = 
Some("/tmp/file")),
+      storage = CatalogStorageFormat.empty.copy(locationUri = Some(new 
URI("/tmp/file"))),
       schema = new StructType().add("a", IntegerType).add("b", StringType),
       provider = Some("parquet"))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 8b8cd0f..6ffa58b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -26,9 +26,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, 
FunctionRegistry, NoSuchPartitionException, NoSuchTableException, 
TempTableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, 
CatalogStorageFormat}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
@@ -72,7 +70,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
 
   private def createDatabase(catalog: SessionCatalog, name: String): Unit = {
     catalog.createDatabase(
-      CatalogDatabase(name, "", spark.sessionState.conf.warehousePath, Map()),
+      CatalogDatabase(
+        name, "", 
CatalogUtils.stringToURI(spark.sessionState.conf.warehousePath), Map()),
       ignoreIfExists = false)
   }
 
@@ -133,11 +132,11 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     }
   }
 
-  private def makeQualifiedPath(path: String): String = {
+  private def makeQualifiedPath(path: String): URI = {
     // copy-paste from SessionCatalog
     val hadoopPath = new Path(path)
     val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration)
-    fs.makeQualified(hadoopPath).toString
+    fs.makeQualified(hadoopPath).toUri
   }
 
   test("Create Database using Default Warehouse Path") {
@@ -449,7 +448,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
           sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
           Row("Database Name", dbNameWithoutBackTicks) ::
             Row("Description", "") ::
-            Row("Location", location) ::
+            Row("Location", CatalogUtils.URIToString(location)) ::
             Row("Properties", "") :: Nil)
 
         sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 
'c'='c')")
@@ -458,7 +457,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
           sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
           Row("Database Name", dbNameWithoutBackTicks) ::
             Row("Description", "") ::
-            Row("Location", location) ::
+            Row("Location", CatalogUtils.URIToString(location)) ::
             Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
 
         sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
@@ -467,7 +466,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
           sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
           Row("Database Name", dbNameWithoutBackTicks) ::
             Row("Description", "") ::
-            Row("Location", location) ::
+            Row("Location", CatalogUtils.URIToString(location)) ::
             Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
       } finally {
         catalog.reset()
@@ -1094,7 +1093,7 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     assert(catalog.getPartition(tableIdent, 
partSpec).storage.locationUri.isDefined)
     assert(catalog.getPartition(tableIdent, 
partSpec).storage.properties.isEmpty)
     // Verify that the location is set to the expected string
-    def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = 
None): Unit = {
+    def verifyLocation(expected: URI, spec: Option[TablePartitionSpec] = 
None): Unit = {
       val storageFormat = spec
         .map { s => catalog.getPartition(tableIdent, s).storage }
         .getOrElse { catalog.getTableMetadata(tableIdent).storage }
@@ -1111,17 +1110,17 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     }
     // set table location
     sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
-    verifyLocation("/path/to/your/lovely/heart")
+    verifyLocation(new URI("/path/to/your/lovely/heart"))
     // set table partition location
     sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION 
'/path/to/part/ways'")
-    verifyLocation("/path/to/part/ways", Some(partSpec))
+    verifyLocation(new URI("/path/to/part/ways"), Some(partSpec))
     // set table location without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'")
-    verifyLocation("/swanky/steak/place")
+    verifyLocation(new URI("/swanky/steak/place"))
     // set table partition location without explicitly specifying database
     sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
-    verifyLocation("vienna", Some(partSpec))
+    verifyLocation(new URI("vienna"), Some(partSpec))
     // table to alter does not exist
     intercept[AnalysisException] {
       sql("ALTER TABLE dbx.does_not_exist SET LOCATION '/mister/spark'")
@@ -1255,7 +1254,7 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
       "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, 
part2, part3))
     assert(catalog.getPartition(tableIdent, 
part1).storage.locationUri.isDefined)
-    assert(catalog.getPartition(tableIdent, part2).storage.locationUri == 
Option("paris"))
+    assert(catalog.getPartition(tableIdent, part2).storage.locationUri == 
Option(new URI("paris")))
     assert(catalog.getPartition(tableIdent, 
part3).storage.locationUri.isDefined)
 
     // add partitions without explicitly specifying database
@@ -1819,7 +1818,7 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
         // SET LOCATION won't move data from previous table path to new table 
path.
         assert(spark.table("tbl").count() == 0)
         // the previous table path should be still there.
-        assert(new File(new URI(defaultTablePath)).exists())
+        assert(new File(defaultTablePath).exists())
 
         sql("INSERT INTO tbl SELECT 2")
         checkAnswer(spark.table("tbl"), Row(2))
@@ -1843,28 +1842,27 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
              |OPTIONS(path "$dir")
            """.stripMargin)
         val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == dir.getAbsolutePath)
+        assert(table.location == new URI(dir.getAbsolutePath))
 
         dir.delete
-        val tableLocFile = new File(table.location)
-        assert(!tableLocFile.exists)
+        assert(!dir.exists)
         spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
-        assert(tableLocFile.exists)
+        assert(dir.exists)
         checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
 
         Utils.deleteRecursively(dir)
-        assert(!tableLocFile.exists)
+        assert(!dir.exists)
         spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
-        assert(tableLocFile.exists)
+        assert(dir.exists)
         checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
 
         val newDirFile = new File(dir, "x")
-        val newDir = newDirFile.toURI.toString
+        val newDir = newDirFile.getAbsolutePath
         spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
         spark.sessionState.catalog.refreshTable(TableIdentifier("t"))
 
         val table1 = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table1.location == newDir)
+        assert(table1.location == new URI(newDir))
         assert(!newDirFile.exists)
 
         spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
@@ -1885,7 +1883,7 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
              |LOCATION "$dir"
            """.stripMargin)
         val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == dir.getAbsolutePath)
+        assert(table.location == new URI(dir.getAbsolutePath))
 
         spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
         checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1911,13 +1909,13 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
              |OPTIONS(path "$dir")
            """.stripMargin)
         val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == dir.getAbsolutePath)
+        assert(table.location == new URI(dir.getAbsolutePath))
 
         dir.delete()
         checkAnswer(spark.table("t"), Nil)
 
         val newDirFile = new File(dir, "x")
-        val newDir = newDirFile.toURI.toString
+        val newDir = newDirFile.toURI
         spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
 
         val table1 = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
@@ -1967,7 +1965,7 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
                  |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
                """.stripMargin)
             val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-            assert(table.location == dir.getAbsolutePath)
+            assert(table.location == new URI(dir.getAbsolutePath))
 
             checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
         }
@@ -1986,7 +1984,7 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
                  |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
                """.stripMargin)
             val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-            assert(table.location == dir.getAbsolutePath)
+            assert(table.location == new URI(dir.getAbsolutePath))
 
             val partDir = new File(dir, "a=3")
             assert(partDir.exists())
@@ -1996,4 +1994,84 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
       }
     }
   }
+
+  Seq("a b", "a:b", "a%b").foreach { specialChars =>
+    test(s"location uri contains $specialChars for datasource table") {
+      withTable("t", "t1") {
+        withTempDir { dir =>
+          val loc = new File(dir, specialChars)
+          loc.mkdir()
+          spark.sql(
+            s"""
+               |CREATE TABLE t(a string)
+               |USING parquet
+               |LOCATION '$loc'
+             """.stripMargin)
+
+          val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+          assert(table.location == new Path(loc.getAbsolutePath).toUri)
+          assert(new Path(table.location).toString.contains(specialChars))
+
+          assert(loc.listFiles().isEmpty)
+          spark.sql("INSERT INTO TABLE t SELECT 1")
+          assert(loc.listFiles().length >= 1)
+          checkAnswer(spark.table("t"), Row("1") :: Nil)
+        }
+
+        withTempDir { dir =>
+          val loc = new File(dir, specialChars)
+          loc.mkdir()
+          spark.sql(
+            s"""
+               |CREATE TABLE t1(a string, b string)
+               |USING parquet
+               |PARTITIONED BY(b)
+               |LOCATION '$loc'
+             """.stripMargin)
+
+          val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+          assert(table.location == new Path(loc.getAbsolutePath).toUri)
+          assert(new Path(table.location).toString.contains(specialChars))
+
+          assert(loc.listFiles().isEmpty)
+          spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
+          val partFile = new File(loc, "b=2")
+          assert(partFile.listFiles().length >= 1)
+          checkAnswer(spark.table("t1"), Row("1", "2") :: Nil)
+
+          spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') 
SELECT 1")
+          val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14")
+          assert(!partFile1.exists())
+          val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14")
+          assert(partFile2.listFiles().length >= 1)
+          checkAnswer(spark.table("t1"), Row("1", "2") :: Row("1", "2017-03-03 
12:13%3A14") :: Nil)
+        }
+      }
+    }
+  }
+
+  Seq("a b", "a:b", "a%b").foreach { specialChars =>
+    test(s"location uri contains $specialChars for database") {
+      try {
+        withTable("t") {
+          withTempDir { dir =>
+            val loc = new File(dir, specialChars)
+            spark.sql(s"CREATE DATABASE tmpdb LOCATION '$loc'")
+            spark.sql("USE tmpdb")
+
+            import testImplicits._
+            Seq(1).toDF("a").write.saveAsTable("t")
+            val tblloc = new File(loc, "t")
+            val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+            val tblPath = new Path(tblloc.getAbsolutePath)
+            val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
+            assert(table.location == fs.makeQualified(tblPath).toUri)
+            assert(tblloc.listFiles().nonEmpty)
+          }
+        }
+      } finally {
+        spark.sql("DROP DATABASE IF EXISTS tmpdb")
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 75723d0..989a7f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -459,7 +459,7 @@ class CatalogSuite
           options = Map("path" -> dir.getAbsolutePath))
         val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
         assert(table.tableType == CatalogTableType.EXTERNAL)
-        assert(table.storage.locationUri.get == dir.getAbsolutePath)
+        assert(table.storage.locationUri.get == new URI(dir.getAbsolutePath))
 
         Seq((1)).toDF("i").write.insertInto("t")
         assert(dir.exists() && dir.listFiles().nonEmpty)
@@ -481,7 +481,7 @@ class CatalogSuite
         options = Map.empty[String, String])
       val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
       assert(table.tableType == CatalogTableType.MANAGED)
-      val tablePath = new File(new URI(table.storage.locationUri.get))
+      val tablePath = new File(table.storage.locationUri.get)
       assert(tablePath.exists() && tablePath.listFiles().isEmpty)
 
       Seq((1)).toDF("i").write.insertInto("t")

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 9082261..93f3efe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -92,7 +92,7 @@ abstract class BucketedWriteSuite extends QueryTest with 
SQLTestUtils {
 
   def tableDir: File = {
     val identifier = 
spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table")
-    new 
File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier)))
+    new File(spark.sessionState.catalog.defaultTablePath(identifier))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index faf9afc..7ab339e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.sources
 
+import java.net.URI
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, 
StructType}
@@ -78,7 +81,7 @@ class PathOptionSuite extends DataSourceTest with 
SharedSQLContext {
     // should exist even path option is not specified when creating table
     withTable("src") {
       sql(s"CREATE TABLE src(i int) USING 
${classOf[TestOptionsSource].getCanonicalName}")
-      assert(getPathOption("src") == Some(defaultTablePath("src")))
+      assert(getPathOption("src") == 
Some(CatalogUtils.URIToString(defaultTablePath("src"))))
     }
   }
 
@@ -105,7 +108,8 @@ class PathOptionSuite extends DataSourceTest with 
SharedSQLContext {
            |USING ${classOf[TestOptionsSource].getCanonicalName}
            |AS SELECT 1
           """.stripMargin)
-      assert(spark.table("src").schema.head.metadata.getString("path") == 
defaultTablePath("src"))
+      assert(spark.table("src").schema.head.metadata.getString("path") ==
+        CatalogUtils.URIToString(defaultTablePath("src")))
     }
   }
 
@@ -123,7 +127,7 @@ class PathOptionSuite extends DataSourceTest with 
SharedSQLContext {
     withTable("src", "src2") {
       sql(s"CREATE TABLE src(i int) USING 
${classOf[TestOptionsSource].getCanonicalName}")
       sql("ALTER TABLE src RENAME TO src2")
-      assert(getPathOption("src2") == Some(defaultTablePath("src2")))
+      assert(getPathOption("src2") == 
Some(CatalogUtils.URIToString(defaultTablePath("src2"))))
     }
   }
 
@@ -133,7 +137,7 @@ class PathOptionSuite extends DataSourceTest with 
SharedSQLContext {
     }.head
   }
 
-  private def defaultTablePath(tableName: String): String = {
+  private def defaultTablePath(tableName: String): URI = {
     spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 43d9c2b..9ab4624 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -210,7 +210,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         tableDefinition.storage.locationUri.isEmpty
 
       val tableLocation = if (needDefaultTableLocation) {
-        Some(defaultTablePath(tableDefinition.identifier))
+        
Some(CatalogUtils.stringToURI(defaultTablePath(tableDefinition.identifier)))
       } else {
         tableDefinition.storage.locationUri
       }
@@ -260,7 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     // However, in older version of Spark we already store table location in 
storage properties
     // with key "path". Here we keep this behaviour for backward compatibility.
     val storagePropsWithLocation = table.storage.properties ++
-      table.storage.locationUri.map("path" -> _)
+      table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
 
     // converts the table metadata to Spark SQL specific format, i.e. set data 
schema, names and
     // bucket specification to empty. Note that partition columns are 
retained, so that we can
@@ -285,7 +285,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         // compatible format, which means the data source is file-based and 
must have a `path`.
         require(table.storage.locationUri.isDefined,
           "External file-based data source table must have a `path` entry in 
storage properties.")
-        Some(new Path(table.location).toUri.toString)
+        Some(table.location)
       } else {
         None
       }
@@ -432,13 +432,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       //
       // Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for 
more details.
       val tempPath = {
-        val dbLocation = getDatabase(tableDefinition.database).locationUri
+        val dbLocation = new 
Path(getDatabase(tableDefinition.database).locationUri)
         new Path(dbLocation, tableDefinition.identifier.table + 
"-__PLACEHOLDER__")
       }
 
       try {
         client.createTable(
-          tableDefinition.withNewStorage(locationUri = 
Some(tempPath.toString)),
+          tableDefinition.withNewStorage(locationUri = Some(tempPath.toUri)),
           ignoreIfExists)
       } finally {
         FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true)
@@ -563,7 +563,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         //       want to alter the table location to a file path, we will 
fail. This should be fixed
         //       in the future.
 
-        val newLocation = tableDefinition.storage.locationUri
+        val newLocation = 
tableDefinition.storage.locationUri.map(CatalogUtils.URIToString(_))
         val storageWithPathOption = tableDefinition.storage.copy(
           properties = tableDefinition.storage.properties ++ 
newLocation.map("path" -> _))
 
@@ -704,7 +704,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     val storageWithLocation = {
       val tableLocation = getLocationFromStorageProps(table)
       // We pass None as `newPath` here, to remove the path option in storage 
properties.
-      updateLocationInStorageProps(table, newPath = None).copy(locationUri = 
tableLocation)
+      updateLocationInStorageProps(table, newPath = None).copy(
+        locationUri = tableLocation.map(CatalogUtils.stringToURI(_)))
     }
     val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
@@ -848,10 +849,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       // However, Hive metastore is not case preserving and will generate 
wrong partition location
       // with lower cased partition column names. Here we set the default 
partition location
       // manually to avoid this problem.
-      val partitionPath = p.storage.locationUri.map(uri => new Path(new 
URI(uri))).getOrElse {
+      val partitionPath = p.storage.locationUri.map(uri => new 
Path(uri)).getOrElse {
         ExternalCatalogUtils.generatePartitionPath(p.spec, 
partitionColumnNames, tablePath)
       }
-      p.copy(storage = p.storage.copy(locationUri = 
Some(partitionPath.toUri.toString)))
+      p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
     }
     val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = 
lowerCasePartitionSpec(p.spec)))
     client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
@@ -890,7 +891,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       val newParts = newSpecs.map { spec =>
         val rightPath = renamePartitionDirectory(fs, tablePath, 
partitionColumnNames, spec)
         val partition = client.getPartition(db, table, 
lowerCasePartitionSpec(spec))
-        partition.copy(storage = partition.storage.copy(locationUri = 
Some(rightPath.toString)))
+        partition.copy(storage = partition.storage.copy(locationUri = 
Some(rightPath.toUri)))
       }
       alterPartitions(db, table, newParts)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 151a69a..4d3b6c3 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -128,7 +128,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
       QualifiedTableName(relation.tableMeta.database, 
relation.tableMeta.identifier.table)
 
     val lazyPruningEnabled = 
sparkSession.sqlContext.conf.manageFilesourcePartitions
-    val tablePath = new Path(new URI(relation.tableMeta.location))
+    val tablePath = new Path(relation.tableMeta.location)
     val result = if (relation.isPartitioned) {
       val partitionSchema = relation.tableMeta.partitionSchema
       val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
@@ -141,7 +141,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
         // locations,_omitting_ the table's base path.
         val paths = sparkSession.sharedState.externalCatalog
           .listPartitions(tableIdentifier.database, tableIdentifier.name)
-          .map(p => new Path(new URI(p.storage.locationUri.get)))
+          .map(p => new Path(p.storage.locationUri.get))
 
         if (paths.isEmpty) {
           Seq(tablePath)

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 624cfa2..b5ce027 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -133,7 +133,7 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
       } else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
         try {
           val hadoopConf = session.sessionState.newHadoopConf()
-          val tablePath = new Path(new URI(table.location))
+          val tablePath = new Path(table.location)
           val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
           fs.getContentSummary(tablePath).getLength
         } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 7acaa9a..469c9d8 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -317,7 +317,7 @@ private[hive] class HiveClientImpl(
       new HiveDatabase(
         database.name,
         database.description,
-        database.locationUri,
+        CatalogUtils.URIToString(database.locationUri),
         Option(database.properties).map(_.asJava).orNull),
         ignoreIfExists)
   }
@@ -335,7 +335,7 @@ private[hive] class HiveClientImpl(
       new HiveDatabase(
         database.name,
         database.description,
-        database.locationUri,
+        CatalogUtils.URIToString(database.locationUri),
         Option(database.properties).map(_.asJava).orNull))
   }
 
@@ -344,7 +344,7 @@ private[hive] class HiveClientImpl(
       CatalogDatabase(
         name = d.getName,
         description = d.getDescription,
-        locationUri = d.getLocationUri,
+        locationUri = CatalogUtils.stringToURI(d.getLocationUri),
         properties = Option(d.getParameters).map(_.asScala.toMap).orNull)
     }.getOrElse(throw new NoSuchDatabaseException(dbName))
   }
@@ -410,7 +410,7 @@ private[hive] class HiveClientImpl(
         createTime = h.getTTable.getCreateTime.toLong * 1000,
         lastAccessTime = h.getLastAccessTime.toLong * 1000,
         storage = CatalogStorageFormat(
-          locationUri = shim.getDataLocation(h),
+          locationUri = 
shim.getDataLocation(h).map(CatalogUtils.stringToURI(_)),
           // To avoid ClassNotFound exception, we try our best to not get the 
format class, but get
           // the class name directly. However, for non-native tables, there is 
no interface to get
           // the format class name, so we may still throw ClassNotFound in 
this case.
@@ -851,7 +851,8 @@ private[hive] object HiveClientImpl {
     conf.foreach(c => hiveTable.setOwner(c.getUser))
     hiveTable.setCreateTime((table.createTime / 1000).toInt)
     hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
-    table.storage.locationUri.foreach { loc => 
hiveTable.getTTable.getSd.setLocation(loc)}
+    table.storage.locationUri.map(CatalogUtils.URIToString(_)).foreach { loc =>
+      hiveTable.getTTable.getSd.setLocation(loc)}
     
table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
     
table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
     hiveTable.setSerializationLib(
@@ -885,7 +886,7 @@ private[hive] object HiveClientImpl {
     }
     val storageDesc = new StorageDescriptor
     val serdeInfo = new SerDeInfo
-    p.storage.locationUri.foreach(storageDesc.setLocation)
+    
p.storage.locationUri.map(CatalogUtils.URIToString(_)).foreach(storageDesc.setLocation)
     p.storage.inputFormat.foreach(storageDesc.setInputFormat)
     p.storage.outputFormat.foreach(storageDesc.setOutputFormat)
     p.storage.serde.foreach(serdeInfo.setSerializationLib)
@@ -906,7 +907,7 @@ private[hive] object HiveClientImpl {
     CatalogTablePartition(
       spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
       storage = CatalogStorageFormat(
-        locationUri = Option(apiPartition.getSd.getLocation),
+        locationUri = 
Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)),
         inputFormat = Option(apiPartition.getSd.getInputFormat),
         outputFormat = Option(apiPartition.getSd.getOutputFormat),
         serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 7280748..c6188fc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -24,10 +24,9 @@ import java.util.{ArrayList => JArrayList, List => JList, 
Map => JMap, Set => JS
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
-import scala.util.Try
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, 
FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri}
 import org.apache.hadoop.hive.ql.Driver
@@ -41,7 +40,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
-import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, 
CatalogTablePartition, FunctionResource, FunctionResourceType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, 
CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegralType, StringType}
@@ -268,7 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
     val table = hive.getTable(database, tableName)
     parts.foreach { s =>
       val location = s.storage.locationUri.map(
-        uri => new Path(table.getPath, new Path(new URI(uri)))).orNull
+        uri => new Path(table.getPath, new Path(uri))).orNull
       val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
       val spec = s.spec.asJava
       if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
@@ -463,7 +462,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
     val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
     parts.zipWithIndex.foreach { case (s, i) =>
       addPartitionDesc.addPartition(
-        s.spec.asJava, s.storage.locationUri.map(u => new Path(new 
URI(u)).toString).orNull)
+        s.spec.asJava, 
s.storage.locationUri.map(CatalogUtils.URIToString(_)).orNull)
       if (s.parameters.nonEmpty) {
         addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 6d7a1c3..490e02d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import java.net.URI
+
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -70,7 +72,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils 
with TestHiveSingle
     assert(desc.identifier.database == Some("mydb"))
     assert(desc.identifier.table == "page_view")
     assert(desc.tableType == CatalogTableType.EXTERNAL)
-    assert(desc.storage.locationUri == Some("/user/external/page_view"))
+    assert(desc.storage.locationUri == Some(new 
URI("/user/external/page_view")))
     assert(desc.schema.isEmpty) // will be populated later when the table is 
actually created
     assert(desc.comment == Some("This is the staging page view table"))
     // TODO will be SQLText
@@ -102,7 +104,7 @@ class HiveDDLCommandSuite extends PlanTest with 
SQLTestUtils with TestHiveSingle
     assert(desc.identifier.database == Some("mydb"))
     assert(desc.identifier.table == "page_view")
     assert(desc.tableType == CatalogTableType.EXTERNAL)
-    assert(desc.storage.locationUri == Some("/user/external/page_view"))
+    assert(desc.storage.locationUri == Some(new 
URI("/user/external/page_view")))
     assert(desc.schema.isEmpty) // will be populated later when the table is 
actually created
     // TODO will be SQLText
     assert(desc.comment == Some("This is the staging page view table"))
@@ -338,7 +340,7 @@ class HiveDDLCommandSuite extends PlanTest with 
SQLTestUtils with TestHiveSingle
     val query = "CREATE EXTERNAL TABLE tab1 (id int, name string) LOCATION 
'/path/to/nowhere'"
     val (desc, _) = extractTableDesc(query)
     assert(desc.tableType == CatalogTableType.EXTERNAL)
-    assert(desc.storage.locationUri == Some("/path/to/nowhere"))
+    assert(desc.storage.locationUri == Some(new URI("/path/to/nowhere")))
   }
 
   test("create table - if not exists") {
@@ -469,7 +471,7 @@ class HiveDDLCommandSuite extends PlanTest with 
SQLTestUtils with TestHiveSingle
     assert(desc.viewText.isEmpty)
     assert(desc.viewDefaultDatabase.isEmpty)
     assert(desc.viewQueryColumnNames.isEmpty)
-    assert(desc.storage.locationUri == Some("/path/to/mercury"))
+    assert(desc.storage.locationUri == Some(new URI("/path/to/mercury")))
     assert(desc.storage.inputFormat == Some("winput"))
     assert(desc.storage.outputFormat == Some("wowput"))
     assert(desc.storage.serde == Some("org.apache.poof.serde.Baff"))
@@ -644,7 +646,7 @@ class HiveDDLCommandSuite extends PlanTest with 
SQLTestUtils with TestHiveSingle
       .add("id", "int")
       .add("name", "string", nullable = true, comment = "blabla"))
     assert(table.provider == Some(DDLUtils.HIVE_PROVIDER))
-    assert(table.storage.locationUri == Some("/tmp/file"))
+    assert(table.storage.locationUri == Some(new URI("/tmp/file")))
     assert(table.storage.properties == Map("my_prop" -> "1"))
     assert(table.comment == Some("BLABLA"))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
index ee632d2..705d43f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
@@ -40,7 +40,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends 
QueryTest
     spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
 
   val tempDir = Utils.createTempDir().getCanonicalFile
-  val tempDirUri = tempDir.toURI.toString.stripSuffix("/")
+  val tempDirUri = tempDir.toURI
+  val tempDirStr = tempDir.getAbsolutePath
 
   override def beforeEach(): Unit = {
     sql("CREATE DATABASE test_db")
@@ -59,9 +60,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends 
QueryTest
   }
 
   private def defaultTableURI(tableName: String): URI = {
-    val defaultPath =
-      spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, 
Some("test_db")))
-    new Path(defaultPath).toUri
+    spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, 
Some("test_db")))
   }
 
   // Raw table metadata that are dumped from tables created by Spark 2.0. Note 
that, all spark
@@ -170,8 +169,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends 
QueryTest
     identifier = TableIdentifier("tbl7", Some("test_db")),
     tableType = CatalogTableType.EXTERNAL,
     storage = CatalogStorageFormat.empty.copy(
-      locationUri = Some(defaultTableURI("tbl7").toString + 
"-__PLACEHOLDER__"),
-      properties = Map("path" -> tempDirUri)),
+      locationUri = Some(new URI(defaultTableURI("tbl7") + 
"-__PLACEHOLDER__")),
+      properties = Map("path" -> tempDirStr)),
     schema = new StructType(),
     provider = Some("json"),
     properties = Map(
@@ -184,7 +183,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends 
QueryTest
     tableType = CatalogTableType.EXTERNAL,
     storage = CatalogStorageFormat.empty.copy(
       locationUri = Some(tempDirUri),
-      properties = Map("path" -> tempDirUri)),
+      properties = Map("path" -> tempDirStr)),
     schema = simpleSchema,
     properties = Map(
       "spark.sql.sources.provider" -> "parquet",
@@ -195,8 +194,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends 
QueryTest
     identifier = TableIdentifier("tbl9", Some("test_db")),
     tableType = CatalogTableType.EXTERNAL,
     storage = CatalogStorageFormat.empty.copy(
-      locationUri = Some(defaultTableURI("tbl9").toString + 
"-__PLACEHOLDER__"),
-      properties = Map("path" -> tempDirUri)),
+      locationUri = Some(new URI(defaultTableURI("tbl9") + 
"-__PLACEHOLDER__")),
+      properties = Map("path" -> tempDirStr)),
     schema = new StructType(),
     provider = Some("json"),
     properties = Map("spark.sql.sources.provider" -> "json"))
@@ -220,7 +219,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends 
QueryTest
 
       if (tbl.tableType == CatalogTableType.EXTERNAL) {
         // trim the URI prefix
-        val tableLocation = new URI(readBack.storage.locationUri.get).getPath
+        val tableLocation = readBack.storage.locationUri.get.getPath
         val expectedLocation = tempDir.toURI.getPath.stripSuffix("/")
         assert(tableLocation == expectedLocation)
       }
@@ -236,7 +235,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends 
QueryTest
         val readBack = getTableMetadata(tbl.identifier.table)
 
         // trim the URI prefix
-        val actualTableLocation = new 
URI(readBack.storage.locationUri.get).getPath
+        val actualTableLocation = readBack.storage.locationUri.get.getPath
         val expected = dir.toURI.getPath.stripSuffix("/")
         assert(actualTableLocation == expected)
       }
@@ -252,7 +251,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends 
QueryTest
       assert(readBack.schema.sameType(expectedSchema))
 
       // trim the URI prefix
-      val actualTableLocation = new 
URI(readBack.storage.locationUri.get).getPath
+      val actualTableLocation = readBack.storage.locationUri.get.getPath
       val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) {
         tempDir.toURI.getPath.stripSuffix("/")
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 16cf4d7..892a22d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import java.net.URI
+
 import org.apache.spark.sql.{QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
@@ -140,7 +142,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(hiveTable.storage.serde === Some(serde))
 
           assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
-          assert(hiveTable.storage.locationUri === Some(path.toString))
+          assert(hiveTable.storage.locationUri === Some(new 
URI(path.getAbsolutePath)))
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))

http://git-wip-us.apache.org/repos/asf/spark/blob/096df6d9/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 8f0d5d8..5f15a70 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -485,7 +485,7 @@ object SetWarehouseLocationTest extends Logging {
       val tableMetadata =
         catalog.getTableMetadata(TableIdentifier("testLocation", 
Some("default")))
       val expectedLocation =
-        "file:" + expectedWarehouseLocation.toString + "/testlocation"
+        
CatalogUtils.stringToURI(s"file:${expectedWarehouseLocation.toString}/testlocation")
       val actualLocation = tableMetadata.location
       if (actualLocation != expectedLocation) {
         throw new Exception(
@@ -500,8 +500,8 @@ object SetWarehouseLocationTest extends Logging {
       sparkSession.sql("create table testLocation (a int)")
       val tableMetadata =
         catalog.getTableMetadata(TableIdentifier("testLocation", 
Some("testLocationDB")))
-      val expectedLocation =
-        "file:" + expectedWarehouseLocation.toString + 
"/testlocationdb.db/testlocation"
+      val expectedLocation = CatalogUtils.stringToURI(
+        
s"file:${expectedWarehouseLocation.toString}/testlocationdb.db/testlocation")
       val actualLocation = tableMetadata.location
       if (actualLocation != expectedLocation) {
         throw new Exception(
@@ -868,14 +868,16 @@ object SPARK_18360 {
       val rawTable = hiveClient.getTable("default", "test_tbl")
       // Hive will use the value of `hive.metastore.warehouse.dir` to generate 
default table
       // location for tables in default database.
-      assert(rawTable.storage.locationUri.get.contains(newWarehousePath))
+      assert(rawTable.storage.locationUri.map(
+        CatalogUtils.URIToString(_)).get.contains(newWarehousePath))
       hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = false, 
purge = false)
 
       spark.sharedState.externalCatalog.createTable(tableMeta, ignoreIfExists 
= false)
       val readBack = spark.sharedState.externalCatalog.getTable("default", 
"test_tbl")
       // Spark SQL will use the location of default database to generate 
default table
       // location for tables in default database.
-      assert(readBack.storage.locationUri.get.contains(defaultDbLocation))
+      assert(readBack.storage.locationUri.map(CatalogUtils.URIToString(_))
+        .get.contains(defaultDbLocation))
     } finally {
       hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = true, 
purge = false)
       hiveClient.runSqlHive(s"SET 
hive.metastore.warehouse.dir=$defaultDbLocation")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to