HyukjinKwon commented on a change in pull request #32411:
URL: https://github.com/apache/spark/pull/32411#discussion_r630742282



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1604,6 +1604,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val ALLOW_CTAS_NON_EMPTY_LOCATION =
+    buildConf("spark.sql.legacy.ctas.allowNonEmptyLocation")
+      .internal()
+      .doc("When false, Create Table As Location Select will throw Analysis 
Exception, " +
+        "if the location is non empty.")

Review comment:
       ```suggestion
         .doc("When false, CTAS with LOCATION throws an analysis exception if 
the "
           "location is not empty." +
   ```
   

##########
File path: docs/sql-migration-guide.md
##########
@@ -78,7 +78,9 @@ license: |
   - In Spark 3.2, the timestamps subtraction expression such as `timestamp 
'2021-03-31 23:48:00' - timestamp '2021-01-01 00:00:00'` returns values of 
`DayTimeIntervalType`. In Spark 3.1 and earlier, the type of the same 
expression is `CalendarIntervalType`. To restore the behavior before Spark 3.2, 
you can set `spark.sql.legacy.interval.enabled` to `true`.
 
   - In Spark 3.2, `CREATE TABLE .. LIKE ..` command can not use reserved 
properties. You need their specific clauses to specify them, for example, 
`CREATE TABLE test1 LIKE test LOCATION 'some path'`. You can set 
`spark.sql.legacy.notReserveProperties` to `true` to ignore the 
`ParseException`, in this case, these properties will be silently removed, for 
example: `TBLPROPERTIES('owner'='yao')` will have no effect. In Spark version 
3.1 and below, the reserved properties can be used in `CREATE TABLE .. LIKE ..` 
command but have no side effects, for example, 
`TBLPROPERTIES('location'='/tmp')` does not change the location of the table 
but only create a headless property just like `'a'='b'`.
-
+ 
+  - In Spark 3.2, `CREATE TABLE AS SELECT` with non-empty `LOCATION` will 
throw `AnalysisException`. To restore the behavior before Spark 3.2, you can 
set `spark.sql.legacy.ctas.allowNonEmptyLocation` to `true`.
+  

Review comment:
       I think you could just add this in the last. 

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
##########
@@ -249,16 +249,18 @@ class CreateTableAsSelectSuite extends DataSourceTest 
with SharedSparkSession {
 
       // Within the new limit
       Seq(100001, maxNrBuckets).foreach(numBuckets => {
-        withTable("t") {
-          sql(createTableSql(numBuckets))
-          val table = catalog.getTableMetadata(TableIdentifier("t"))
-          assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"), 
Seq("b"))))
-        }
+        withTempDir(tempDir => {
+          withTable("t") {
+            sql(createTableSql(tempDir.toURI, numBuckets))
+            val table = catalog.getTableMetadata(TableIdentifier("t"))
+            assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"), 
Seq("b"))))
+          }
+        })

Review comment:
       ```suggestion
           }
   ```

##########
File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
     }
   }
 
+  test("SPARK-28551: CTAS Hive Table should be with non-existent or empty 
location") {
+    def executeCTASWithNonEmptyLocation(tempLocation: String) {
+      sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+        s"'file:$tempLocation/ctas1'")

Review comment:
       ```suggestion
           s"'$tempLocation/ctas1'")
   ```

##########
File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
     }
   }
 
+  test("SPARK-28551: CTAS Hive Table should be with non-existent or empty 
location") {
+    def executeCTASWithNonEmptyLocation(tempLocation: String) {
+      sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+        s"'file:$tempLocation/ctas1'")
+      sql("INSERT INTO TABLE ctas1 SELECT 'A' ")
+      sql(s"CREATE TABLE ctas_with_existing_location stored as rcfile " +
+        s"LOCATION 'file:$tempLocation' " +
+        s"AS SELECT key k, value FROM src ORDER BY k, value")
+    }
+
+    Seq("false", "true").foreach { convertCTASFlag =>
+      Seq("false", "true").foreach { allowNonEmptyDirFlag =>
+        withSQLConf(SQLConf.CONVERT_CTAS.key -> convertCTASFlag,
+          SQLConf.ALLOW_CTAS_NON_EMPTY_LOCATION.key -> allowNonEmptyDirFlag) {

Review comment:
       ```suggestion
           withSQLConf(
               SQLConf.CONVERT_CTAS.key -> convertCTASFlag.toString,
               SQLConf.ALLOW_CTAS_NON_EMPTY_LOCATION.key -> 
allowNonEmptyDirFlag.toString) {
   ```

##########
File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
     }
   }
 
+  test("SPARK-28551: CTAS Hive Table should be with non-existent or empty 
location") {
+    def executeCTASWithNonEmptyLocation(tempLocation: String) {
+      sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +

Review comment:
       Can we use Scala's multiline style with vertical bars?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
       sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
       metrics.values.toSeq)
   }
+
+  def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession: 
SparkSession) {
+    if (saveMode != SaveMode.Overwrite &&
+      !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+      val filePath = new org.apache.hadoop.fs.Path(tablePath)
+      val fs = 
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+      if(fs != null && fs.exists(filePath)) {
+        val locStats = fs.getFileStatus(filePath)
+        if(locStats != null && locStats.isDirectory) {
+          val lStats = fs.listStatus(filePath)
+          if(lStats != null && lStats.length != 0) {

Review comment:
       I think you can make it as a single condition:
   
   ```scala
   if (fs.exists(filePath) &&
       fs.getFileStatus(filePath).isDirectory &&
       fs.listStatus(filePath).length != 0) {
     ...
   }
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
       sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
       metrics.values.toSeq)
   }
+
+  def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession: 
SparkSession) {
+    if (saveMode != SaveMode.Overwrite &&
+      !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+      val filePath = new org.apache.hadoop.fs.Path(tablePath)
+      val fs = 
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+      if(fs != null && fs.exists(filePath)) {
+        val locStats = fs.getFileStatus(filePath)
+        if(locStats != null && locStats.isDirectory) {
+          val lStats = fs.listStatus(filePath)
+          if(lStats != null && lStats.length != 0) {
+            throw new AnalysisException(
+              s"CREATE-TABLE-AS-SELECT cannot create table" +

Review comment:
       I would say something like this:
   > The directory ${tablePath} was not empty but CTAS with LOCATION requires 
the location to be empty.
   
   See also https://spark.apache.org/error-message-guidelines.html

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
##########
@@ -249,16 +249,18 @@ class CreateTableAsSelectSuite extends DataSourceTest 
with SharedSparkSession {
 
       // Within the new limit
       Seq(100001, maxNrBuckets).foreach(numBuckets => {
-        withTable("t") {
-          sql(createTableSql(numBuckets))
-          val table = catalog.getTableMetadata(TableIdentifier("t"))
-          assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"), 
Seq("b"))))
-        }
+        withTempDir(tempDir => {

Review comment:
       ```suggestion
           withTempDir { tempDir =>
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
       sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
       metrics.values.toSeq)
   }
+
+  def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession: 
SparkSession) {
+    if (saveMode != SaveMode.Overwrite &&
+      !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+      val filePath = new org.apache.hadoop.fs.Path(tablePath)
+      val fs = 
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+      if(fs != null && fs.exists(filePath)) {
+        val locStats = fs.getFileStatus(filePath)
+        if(locStats != null && locStats.isDirectory) {

Review comment:
       can the file status be `null`?

##########
File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
     }
   }
 
+  test("SPARK-28551: CTAS Hive Table should be with non-existent or empty 
location") {
+    def executeCTASWithNonEmptyLocation(tempLocation: String) {
+      sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+        s"'file:$tempLocation/ctas1'")
+      sql("INSERT INTO TABLE ctas1 SELECT 'A' ")
+      sql(s"CREATE TABLE ctas_with_existing_location stored as rcfile " +
+        s"LOCATION 'file:$tempLocation' " +
+        s"AS SELECT key k, value FROM src ORDER BY k, value")
+    }
+
+    Seq("false", "true").foreach { convertCTASFlag =>

Review comment:
       ```suggestion
       Seq(false, true).foreach { convertCTASFlag =>
   ```

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
##########
@@ -249,16 +249,18 @@ class CreateTableAsSelectSuite extends DataSourceTest 
with SharedSparkSession {
 
       // Within the new limit
       Seq(100001, maxNrBuckets).foreach(numBuckets => {
-        withTable("t") {
-          sql(createTableSql(numBuckets))
-          val table = catalog.getTableMetadata(TableIdentifier("t"))
-          assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"), 
Seq("b"))))
-        }
+        withTempDir(tempDir => {
+          withTable("t") {
+            sql(createTableSql(tempDir.toURI, numBuckets))

Review comment:
       ```suggestion
               sql(createTableSql(tempDir.toURI.toString, numBuckets))
   ```

##########
File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
     }
   }
 
+  test("SPARK-28551: CTAS Hive Table should be with non-existent or empty 
location") {
+    def executeCTASWithNonEmptyLocation(tempLocation: String) {
+      sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+        s"'file:$tempLocation/ctas1'")
+      sql("INSERT INTO TABLE ctas1 SELECT 'A' ")
+      sql(s"CREATE TABLE ctas_with_existing_location stored as rcfile " +
+        s"LOCATION 'file:$tempLocation' " +
+        s"AS SELECT key k, value FROM src ORDER BY k, value")
+    }
+
+    Seq("false", "true").foreach { convertCTASFlag =>
+      Seq("false", "true").foreach { allowNonEmptyDirFlag =>
+        withSQLConf(SQLConf.CONVERT_CTAS.key -> convertCTASFlag,
+          SQLConf.ALLOW_CTAS_NON_EMPTY_LOCATION.key -> allowNonEmptyDirFlag) {
+          withTempDir { dir =>
+            val tempLocation = dir.toURI.getPath.stripSuffix("/")

Review comment:
       ```suggestion
               val tempLocation = dir.toURI.toString
   ```

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
##########
@@ -249,16 +249,18 @@ class CreateTableAsSelectSuite extends DataSourceTest 
with SharedSparkSession {
 
       // Within the new limit
       Seq(100001, maxNrBuckets).foreach(numBuckets => {
-        withTable("t") {
-          sql(createTableSql(numBuckets))
-          val table = catalog.getTableMetadata(TableIdentifier("t"))
-          assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"), 
Seq("b"))))
-        }
+        withTempDir(tempDir => {
+          withTable("t") {
+            sql(createTableSql(tempDir.toURI, numBuckets))
+            val table = catalog.getTableMetadata(TableIdentifier("t"))
+            assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"), 
Seq("b"))))
+          }
+        })
       })
 
       // Over the new limit
       withTable("t") {
-        val e = intercept[AnalysisException](sql(createTableSql(maxNrBuckets + 
1)))
+        val e = intercept[AnalysisException](sql(createTableSql(path.toURI, 
maxNrBuckets + 1)))

Review comment:
       ```suggestion
           val e = 
intercept[AnalysisException](sql(createTableSql(path.toURI.toString, 
maxNrBuckets + 1)))
   ```

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1604,6 +1604,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val ALLOW_CTAS_NON_EMPTY_LOCATION =
+    buildConf("spark.sql.legacy.ctas.allowNonEmptyLocation")

Review comment:
       ```suggestion
       buildConf("spark.sql.legacy.allowNonEmptyLocationInCTAS")
   ```

##########
File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
     }
   }
 
+  test("SPARK-28551: CTAS Hive Table should be with non-existent or empty 
location") {
+    def executeCTASWithNonEmptyLocation(tempLocation: String) {
+      sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+        s"'file:$tempLocation/ctas1'")
+      sql("INSERT INTO TABLE ctas1 SELECT 'A' ")
+      sql(s"CREATE TABLE ctas_with_existing_location stored as rcfile " +
+        s"LOCATION 'file:$tempLocation' " +
+        s"AS SELECT key k, value FROM src ORDER BY k, value")
+    }
+
+    Seq("false", "true").foreach { convertCTASFlag =>
+      Seq("false", "true").foreach { allowNonEmptyDirFlag =>

Review comment:
       ```suggestion
         Seq(false, true).foreach { allowNonEmptyDirFlag =>
   ```

##########
File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
     }
   }
 
+  test("SPARK-28551: CTAS Hive Table should be with non-existent or empty 
location") {
+    def executeCTASWithNonEmptyLocation(tempLocation: String) {
+      sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+        s"'file:$tempLocation/ctas1'")
+      sql("INSERT INTO TABLE ctas1 SELECT 'A' ")
+      sql(s"CREATE TABLE ctas_with_existing_location stored as rcfile " +
+        s"LOCATION 'file:$tempLocation' " +

Review comment:
       ```suggestion
           s"LOCATION '$tempLocation' " +
   ```

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
##########
@@ -235,10 +235,10 @@ class CreateTableAsSelectSuite extends DataSourceTest 
with SharedSparkSession {
   }
 
   test("create table using as select - with overridden max number of buckets") 
{
-    def createTableSql(numBuckets: Int): String =
+    def createTableSql(tablePath: java.net.URI, numBuckets: Int): String =

Review comment:
       ```suggestion
       def createTableSql(tablePath: String, numBuckets: Int): String =
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
##########
@@ -166,6 +166,9 @@ case class CreateDataSourceTableAsSelectCommand(
       saveDataIntoTable(
         sparkSession, table, table.storage.locationUri, child, 
SaveMode.Append, tableExists = true)
     } else {
+      table.storage.locationUri.foreach { p =>
+        DataWritingCommand.assertEmptyRootPath(p, mode, sparkSession)

Review comment:
       ```suggestion
           DataWritingCommand.assertEmptyRootPath(p, mode)
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
       sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
       metrics.values.toSeq)
   }
+
+  def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession: 
SparkSession) {
+    if (saveMode != SaveMode.Overwrite &&
+      !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {

Review comment:
       ```suggestion
         !SQLConf.get.allowCreateTableAsSelectNonEmptyLocation) {
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
       sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
       metrics.values.toSeq)
   }
+
+  def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession: 
SparkSession) {

Review comment:
       ```suggestion
     def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode) {
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
       sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
       metrics.values.toSeq)
   }
+
+  def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession: 
SparkSession) {
+    if (saveMode != SaveMode.Overwrite &&
+      !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+      val filePath = new org.apache.hadoop.fs.Path(tablePath)
+      val fs = 
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+      if(fs != null && fs.exists(filePath)) {

Review comment:
       can `fs` be `null`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
       sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
       metrics.values.toSeq)
   }
+
+  def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession: 
SparkSession) {
+    if (saveMode != SaveMode.Overwrite &&
+      !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+      val filePath = new org.apache.hadoop.fs.Path(tablePath)
+      val fs = 
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)

Review comment:
       Can we pass Hadoop configuration as an argument for 
`assertEmptyRootPath` function?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
       sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
       metrics.values.toSeq)
   }
+
+  def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession: 
SparkSession) {
+    if (saveMode != SaveMode.Overwrite &&
+      !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+      val filePath = new org.apache.hadoop.fs.Path(tablePath)
+      val fs = 
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+      if(fs != null && fs.exists(filePath)) {

Review comment:
       ```suggestion
         if (fs != null && fs.exists(filePath)) {
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
       sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
       metrics.values.toSeq)
   }
+
+  def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession: 
SparkSession) {
+    if (saveMode != SaveMode.Overwrite &&
+      !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+      val filePath = new org.apache.hadoop.fs.Path(tablePath)
+      val fs = 
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+      if(fs != null && fs.exists(filePath)) {
+        val locStats = fs.getFileStatus(filePath)
+        if(locStats != null && locStats.isDirectory) {

Review comment:
       ```suggestion
           if (locStats != null && locStats.isDirectory) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to