HyukjinKwon commented on a change in pull request #32411:
URL: https://github.com/apache/spark/pull/32411#discussion_r630742282
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1604,6 +1604,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val ALLOW_CTAS_NON_EMPTY_LOCATION =
+ buildConf("spark.sql.legacy.ctas.allowNonEmptyLocation")
+ .internal()
+ .doc("When false, Create Table As Location Select will throw Analysis
Exception, " +
+ "if the location is non empty.")
Review comment:
```suggestion
.doc("When false, CTAS with LOCATION throws an analysis exception if
the "
"location is not empty." +
```
##########
File path: docs/sql-migration-guide.md
##########
@@ -78,7 +78,9 @@ license: |
- In Spark 3.2, the timestamps subtraction expression such as `timestamp
'2021-03-31 23:48:00' - timestamp '2021-01-01 00:00:00'` returns values of
`DayTimeIntervalType`. In Spark 3.1 and earlier, the type of the same
expression is `CalendarIntervalType`. To restore the behavior before Spark 3.2,
you can set `spark.sql.legacy.interval.enabled` to `true`.
- In Spark 3.2, `CREATE TABLE .. LIKE ..` command can not use reserved
properties. You need their specific clauses to specify them, for example,
`CREATE TABLE test1 LIKE test LOCATION 'some path'`. You can set
`spark.sql.legacy.notReserveProperties` to `true` to ignore the
`ParseException`, in this case, these properties will be silently removed, for
example: `TBLPROPERTIES('owner'='yao')` will have no effect. In Spark version
3.1 and below, the reserved properties can be used in `CREATE TABLE .. LIKE ..`
command but have no side effects, for example,
`TBLPROPERTIES('location'='/tmp')` does not change the location of the table
but only create a headless property just like `'a'='b'`.
-
+
+ - In Spark 3.2, `CREATE TABLE AS SELECT` with non-empty `LOCATION` will
throw `AnalysisException`. To restore the behavior before Spark 3.2, you can
set `spark.sql.legacy.ctas.allowNonEmptyLocation` to `true`.
+
Review comment:
I think you could just add this in the last.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
##########
@@ -249,16 +249,18 @@ class CreateTableAsSelectSuite extends DataSourceTest
with SharedSparkSession {
// Within the new limit
Seq(100001, maxNrBuckets).foreach(numBuckets => {
- withTable("t") {
- sql(createTableSql(numBuckets))
- val table = catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"),
Seq("b"))))
- }
+ withTempDir(tempDir => {
+ withTable("t") {
+ sql(createTableSql(tempDir.toURI, numBuckets))
+ val table = catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"),
Seq("b"))))
+ }
+ })
Review comment:
```suggestion
}
```
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
}
}
+ test("SPARK-28551: CTAS Hive Table should be with non-existent or empty
location") {
+ def executeCTASWithNonEmptyLocation(tempLocation: String) {
+ sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+ s"'file:$tempLocation/ctas1'")
Review comment:
```suggestion
s"'$tempLocation/ctas1'")
```
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
}
}
+ test("SPARK-28551: CTAS Hive Table should be with non-existent or empty
location") {
+ def executeCTASWithNonEmptyLocation(tempLocation: String) {
+ sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+ s"'file:$tempLocation/ctas1'")
+ sql("INSERT INTO TABLE ctas1 SELECT 'A' ")
+ sql(s"CREATE TABLE ctas_with_existing_location stored as rcfile " +
+ s"LOCATION 'file:$tempLocation' " +
+ s"AS SELECT key k, value FROM src ORDER BY k, value")
+ }
+
+ Seq("false", "true").foreach { convertCTASFlag =>
+ Seq("false", "true").foreach { allowNonEmptyDirFlag =>
+ withSQLConf(SQLConf.CONVERT_CTAS.key -> convertCTASFlag,
+ SQLConf.ALLOW_CTAS_NON_EMPTY_LOCATION.key -> allowNonEmptyDirFlag) {
Review comment:
```suggestion
withSQLConf(
SQLConf.CONVERT_CTAS.key -> convertCTASFlag.toString,
SQLConf.ALLOW_CTAS_NON_EMPTY_LOCATION.key ->
allowNonEmptyDirFlag.toString) {
```
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
}
}
+ test("SPARK-28551: CTAS Hive Table should be with non-existent or empty
location") {
+ def executeCTASWithNonEmptyLocation(tempLocation: String) {
+ sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
Review comment:
Can we use Scala's multiline style with vertical bars?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
+
+ def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession:
SparkSession) {
+ if (saveMode != SaveMode.Overwrite &&
+ !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+ val filePath = new org.apache.hadoop.fs.Path(tablePath)
+ val fs =
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+ if(fs != null && fs.exists(filePath)) {
+ val locStats = fs.getFileStatus(filePath)
+ if(locStats != null && locStats.isDirectory) {
+ val lStats = fs.listStatus(filePath)
+ if(lStats != null && lStats.length != 0) {
Review comment:
I think you can make it as a single condition:
```scala
if (fs.exists(filePath) &&
fs.getFileStatus(filePath).isDirectory &&
fs.listStatus(filePath).length != 0) {
...
}
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
+
+ def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession:
SparkSession) {
+ if (saveMode != SaveMode.Overwrite &&
+ !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+ val filePath = new org.apache.hadoop.fs.Path(tablePath)
+ val fs =
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+ if(fs != null && fs.exists(filePath)) {
+ val locStats = fs.getFileStatus(filePath)
+ if(locStats != null && locStats.isDirectory) {
+ val lStats = fs.listStatus(filePath)
+ if(lStats != null && lStats.length != 0) {
+ throw new AnalysisException(
+ s"CREATE-TABLE-AS-SELECT cannot create table" +
Review comment:
I would say something like this:
> The directory ${tablePath} was not empty but CTAS with LOCATION requires
the location to be empty.
See also https://spark.apache.org/error-message-guidelines.html
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
##########
@@ -249,16 +249,18 @@ class CreateTableAsSelectSuite extends DataSourceTest
with SharedSparkSession {
// Within the new limit
Seq(100001, maxNrBuckets).foreach(numBuckets => {
- withTable("t") {
- sql(createTableSql(numBuckets))
- val table = catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"),
Seq("b"))))
- }
+ withTempDir(tempDir => {
Review comment:
```suggestion
withTempDir { tempDir =>
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
+
+ def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession:
SparkSession) {
+ if (saveMode != SaveMode.Overwrite &&
+ !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+ val filePath = new org.apache.hadoop.fs.Path(tablePath)
+ val fs =
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+ if(fs != null && fs.exists(filePath)) {
+ val locStats = fs.getFileStatus(filePath)
+ if(locStats != null && locStats.isDirectory) {
Review comment:
can the file status be `null`?
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
}
}
+ test("SPARK-28551: CTAS Hive Table should be with non-existent or empty
location") {
+ def executeCTASWithNonEmptyLocation(tempLocation: String) {
+ sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+ s"'file:$tempLocation/ctas1'")
+ sql("INSERT INTO TABLE ctas1 SELECT 'A' ")
+ sql(s"CREATE TABLE ctas_with_existing_location stored as rcfile " +
+ s"LOCATION 'file:$tempLocation' " +
+ s"AS SELECT key k, value FROM src ORDER BY k, value")
+ }
+
+ Seq("false", "true").foreach { convertCTASFlag =>
Review comment:
```suggestion
Seq(false, true).foreach { convertCTASFlag =>
```
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
##########
@@ -249,16 +249,18 @@ class CreateTableAsSelectSuite extends DataSourceTest
with SharedSparkSession {
// Within the new limit
Seq(100001, maxNrBuckets).foreach(numBuckets => {
- withTable("t") {
- sql(createTableSql(numBuckets))
- val table = catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"),
Seq("b"))))
- }
+ withTempDir(tempDir => {
+ withTable("t") {
+ sql(createTableSql(tempDir.toURI, numBuckets))
Review comment:
```suggestion
sql(createTableSql(tempDir.toURI.toString, numBuckets))
```
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
}
}
+ test("SPARK-28551: CTAS Hive Table should be with non-existent or empty
location") {
+ def executeCTASWithNonEmptyLocation(tempLocation: String) {
+ sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+ s"'file:$tempLocation/ctas1'")
+ sql("INSERT INTO TABLE ctas1 SELECT 'A' ")
+ sql(s"CREATE TABLE ctas_with_existing_location stored as rcfile " +
+ s"LOCATION 'file:$tempLocation' " +
+ s"AS SELECT key k, value FROM src ORDER BY k, value")
+ }
+
+ Seq("false", "true").foreach { convertCTASFlag =>
+ Seq("false", "true").foreach { allowNonEmptyDirFlag =>
+ withSQLConf(SQLConf.CONVERT_CTAS.key -> convertCTASFlag,
+ SQLConf.ALLOW_CTAS_NON_EMPTY_LOCATION.key -> allowNonEmptyDirFlag) {
+ withTempDir { dir =>
+ val tempLocation = dir.toURI.getPath.stripSuffix("/")
Review comment:
```suggestion
val tempLocation = dir.toURI.toString
```
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
##########
@@ -249,16 +249,18 @@ class CreateTableAsSelectSuite extends DataSourceTest
with SharedSparkSession {
// Within the new limit
Seq(100001, maxNrBuckets).foreach(numBuckets => {
- withTable("t") {
- sql(createTableSql(numBuckets))
- val table = catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"),
Seq("b"))))
- }
+ withTempDir(tempDir => {
+ withTable("t") {
+ sql(createTableSql(tempDir.toURI, numBuckets))
+ val table = catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"),
Seq("b"))))
+ }
+ })
})
// Over the new limit
withTable("t") {
- val e = intercept[AnalysisException](sql(createTableSql(maxNrBuckets +
1)))
+ val e = intercept[AnalysisException](sql(createTableSql(path.toURI,
maxNrBuckets + 1)))
Review comment:
```suggestion
val e =
intercept[AnalysisException](sql(createTableSql(path.toURI.toString,
maxNrBuckets + 1)))
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1604,6 +1604,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val ALLOW_CTAS_NON_EMPTY_LOCATION =
+ buildConf("spark.sql.legacy.ctas.allowNonEmptyLocation")
Review comment:
```suggestion
buildConf("spark.sql.legacy.allowNonEmptyLocationInCTAS")
```
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
}
}
+ test("SPARK-28551: CTAS Hive Table should be with non-existent or empty
location") {
+ def executeCTASWithNonEmptyLocation(tempLocation: String) {
+ sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+ s"'file:$tempLocation/ctas1'")
+ sql("INSERT INTO TABLE ctas1 SELECT 'A' ")
+ sql(s"CREATE TABLE ctas_with_existing_location stored as rcfile " +
+ s"LOCATION 'file:$tempLocation' " +
+ s"AS SELECT key k, value FROM src ORDER BY k, value")
+ }
+
+ Seq("false", "true").foreach { convertCTASFlag =>
+ Seq("false", "true").foreach { allowNonEmptyDirFlag =>
Review comment:
```suggestion
Seq(false, true).foreach { allowNonEmptyDirFlag =>
```
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
##########
@@ -598,6 +598,38 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
}
}
+ test("SPARK-28551: CTAS Hive Table should be with non-existent or empty
location") {
+ def executeCTASWithNonEmptyLocation(tempLocation: String) {
+ sql(s"CREATE TABLE ctas1(id string) stored as rcfile LOCATION " +
+ s"'file:$tempLocation/ctas1'")
+ sql("INSERT INTO TABLE ctas1 SELECT 'A' ")
+ sql(s"CREATE TABLE ctas_with_existing_location stored as rcfile " +
+ s"LOCATION 'file:$tempLocation' " +
Review comment:
```suggestion
s"LOCATION '$tempLocation' " +
```
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
##########
@@ -235,10 +235,10 @@ class CreateTableAsSelectSuite extends DataSourceTest
with SharedSparkSession {
}
test("create table using as select - with overridden max number of buckets")
{
- def createTableSql(numBuckets: Int): String =
+ def createTableSql(tablePath: java.net.URI, numBuckets: Int): String =
Review comment:
```suggestion
def createTableSql(tablePath: String, numBuckets: Int): String =
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
##########
@@ -166,6 +166,9 @@ case class CreateDataSourceTableAsSelectCommand(
saveDataIntoTable(
sparkSession, table, table.storage.locationUri, child,
SaveMode.Append, tableExists = true)
} else {
+ table.storage.locationUri.foreach { p =>
+ DataWritingCommand.assertEmptyRootPath(p, mode, sparkSession)
Review comment:
```suggestion
DataWritingCommand.assertEmptyRootPath(p, mode)
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
+
+ def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession:
SparkSession) {
+ if (saveMode != SaveMode.Overwrite &&
+ !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
Review comment:
```suggestion
!SQLConf.get.allowCreateTableAsSelectNonEmptyLocation) {
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
+
+ def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession:
SparkSession) {
Review comment:
```suggestion
def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode) {
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
+
+ def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession:
SparkSession) {
+ if (saveMode != SaveMode.Overwrite &&
+ !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+ val filePath = new org.apache.hadoop.fs.Path(tablePath)
+ val fs =
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+ if(fs != null && fs.exists(filePath)) {
Review comment:
can `fs` be `null`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
+
+ def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession:
SparkSession) {
+ if (saveMode != SaveMode.Overwrite &&
+ !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+ val filePath = new org.apache.hadoop.fs.Path(tablePath)
+ val fs =
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
Review comment:
Can we pass Hadoop configuration as an argument for
`assertEmptyRootPath` function?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
+
+ def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession:
SparkSession) {
+ if (saveMode != SaveMode.Overwrite &&
+ !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+ val filePath = new org.apache.hadoop.fs.Path(tablePath)
+ val fs =
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+ if(fs != null && fs.exists(filePath)) {
Review comment:
```suggestion
if (fs != null && fs.exists(filePath)) {
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
##########
@@ -96,4 +98,23 @@ object DataWritingCommand {
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
+
+ def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, sparkSession:
SparkSession) {
+ if (saveMode != SaveMode.Overwrite &&
+ !sparkSession.sqlContext.conf.allowCreateTableAsSelectNonEmptyLocation) {
+ val filePath = new org.apache.hadoop.fs.Path(tablePath)
+ val fs =
filePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
+ if(fs != null && fs.exists(filePath)) {
+ val locStats = fs.getFileStatus(filePath)
+ if(locStats != null && locStats.isDirectory) {
Review comment:
```suggestion
if (locStats != null && locStats.isDirectory) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]