Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15996#discussion_r93720521
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 ---
    @@ -140,153 +140,55 @@ case class CreateDataSourceTableAsSelectCommand(
         val tableIdentWithDB = table.identifier.copy(database = Some(db))
         val tableName = tableIdentWithDB.unquotedString
     
    -    var createMetastoreTable = false
    -    // We may need to reorder the columns of the query to match the 
existing table.
    -    var reorderedColumns = Option.empty[Seq[NamedExpression]]
         if (sessionState.catalog.tableExists(tableIdentWithDB)) {
    -      // Check if we need to throw an exception or just return.
    -      mode match {
    -        case SaveMode.ErrorIfExists =>
    -          throw new AnalysisException(s"Table $tableName already exists. " 
+
    -            s"If you are using saveAsTable, you can set SaveMode to 
SaveMode.Append to " +
    -            s"insert data into the table or set SaveMode to 
SaveMode.Overwrite to overwrite" +
    -            s"the existing data. " +
    -            s"Or, if you are using SQL CREATE TABLE, you need to drop 
$tableName first.")
    -        case SaveMode.Ignore =>
    -          // Since the table already exists and the save mode is Ignore, 
we will just return.
    -          return Seq.empty[Row]
    -        case SaveMode.Append =>
    -          val existingTable = 
sessionState.catalog.getTableMetadata(tableIdentWithDB)
    -
    -          if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
    -            throw new AnalysisException(s"Saving data in the Hive serde 
table $tableName is " +
    -              "not supported yet. Please use the insertInto() API as an 
alternative.")
    -          }
    -
    -          // Check if the specified data source match the data source of 
the existing table.
    -          val existingProvider = 
DataSource.lookupDataSource(existingTable.provider.get)
    -          val specifiedProvider = 
DataSource.lookupDataSource(table.provider.get)
    -          // TODO: Check that options from the resolved relation match the 
relation that we are
    -          // inserting into (i.e. using the same compression).
    -          if (existingProvider != specifiedProvider) {
    -            throw new AnalysisException(s"The format of the existing table 
$tableName is " +
    -              s"`${existingProvider.getSimpleName}`. It doesn't match the 
specified format " +
    -              s"`${specifiedProvider.getSimpleName}`.")
    -          }
    -
    -          if (query.schema.length != existingTable.schema.length) {
    -            throw new AnalysisException(
    -              s"The column number of the existing table $tableName" +
    -                s"(${existingTable.schema.catalogString}) doesn't match 
the data schema" +
    -                s"(${query.schema.catalogString})")
    -          }
    -
    -          val resolver = sessionState.conf.resolver
    -          val tableCols = existingTable.schema.map(_.name)
    -
    -          reorderedColumns = Some(existingTable.schema.map { f =>
    -            query.resolve(Seq(f.name), resolver).getOrElse {
    -              val inputColumns = query.schema.map(_.name).mkString(", ")
    -              throw new AnalysisException(
    -                s"cannot resolve '${f.name}' given input columns: 
[$inputColumns]")
    -            }
    -          })
    -
    -          // In `AnalyzeCreateTable`, we verified the consistency between 
the user-specified table
    -          // definition(partition columns, bucketing) and the SELECT 
query, here we also need to
    -          // verify the the consistency between the user-specified table 
definition and the existing
    -          // table definition.
    -
    -          // Check if the specified partition columns match the existing 
table.
    -          val specifiedPartCols = CatalogUtils.normalizePartCols(
    -            tableName, tableCols, table.partitionColumnNames, resolver)
    -          if (specifiedPartCols != existingTable.partitionColumnNames) {
    -            throw new AnalysisException(
    -              s"""
    -                |Specified partitioning does not match that of the 
existing table $tableName.
    -                |Specified partition columns: 
[${specifiedPartCols.mkString(", ")}]
    -                |Existing partition columns: 
[${existingTable.partitionColumnNames.mkString(", ")}]
    -              """.stripMargin)
    -          }
    -
    -          // Check if the specified bucketing match the existing table.
    -          val specifiedBucketSpec = table.bucketSpec.map { bucketSpec =>
    -            CatalogUtils.normalizeBucketSpec(tableName, tableCols, 
bucketSpec, resolver)
    -          }
    -          if (specifiedBucketSpec != existingTable.bucketSpec) {
    -            val specifiedBucketString =
    -              specifiedBucketSpec.map(_.toString).getOrElse("not bucketed")
    -            val existingBucketString =
    -              existingTable.bucketSpec.map(_.toString).getOrElse("not 
bucketed")
    -            throw new AnalysisException(
    -              s"""
    -                |Specified bucketing does not match that of the existing 
table $tableName.
    -                |Specified bucketing: $specifiedBucketString
    -                |Existing bucketing: $existingBucketString
    -              """.stripMargin)
    -          }
    -
    -        case SaveMode.Overwrite =>
    -          sessionState.catalog.dropTable(tableIdentWithDB, 
ignoreIfNotExists = true, purge = false)
    -          // Need to create the table again.
    -          createMetastoreTable = true
    +      if (ignoreIfExists) {
    +        // Do nothing
    +      } else {
    +        throw new AnalysisException(s"Table $tableName already exists. You 
need to drop it first.")
           }
         } else {
    -      // The table does not exist. We need to create it in metastore.
    -      createMetastoreTable = true
    -    }
    -
    -    val data = Dataset.ofRows(sparkSession, query)
    -    val df = reorderedColumns match {
    -      // Reorder the columns of the query to match the existing table.
    -      case Some(cols) => data.select(cols.map(Column(_)): _*)
    -      case None => data
    -    }
    -
    -    val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
    -      Some(sessionState.catalog.defaultTablePath(table.identifier))
    -    } else {
    -      table.storage.locationUri
    -    }
    +      val tableLocation = if (table.tableType == CatalogTableType.MANAGED) 
{
    +        Some(sessionState.catalog.defaultTablePath(table.identifier))
    +      } else {
    +        table.storage.locationUri
    +      }
     
    -    // Create the relation based on the data of df.
    -    val pathOption = tableLocation.map("path" -> _)
    -    val dataSource = DataSource(
    -      sparkSession,
    -      className = provider,
    -      partitionColumns = table.partitionColumnNames,
    -      bucketSpec = table.bucketSpec,
    -      options = table.storage.properties ++ pathOption,
    -      catalogTable = Some(table))
    +      // Create the relation based on the data of df.
    +      val pathOption = tableLocation.map("path" -> _)
    +      val dataSource = DataSource(
    +        sparkSession,
    +        className = provider,
    +        partitionColumns = table.partitionColumnNames,
    +        bucketSpec = table.bucketSpec,
    +        options = table.storage.properties ++ pathOption,
    +        catalogTable = Some(table))
    +
    +      val result = try {
    +        dataSource.write(SaveMode.Overwrite, Dataset.ofRows(sparkSession, 
query))
    --- End diff --
    
    Can you explain why we always use `SaveMode.Overwrite` at here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to