Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5963#discussion_r30002904
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -407,64 +407,60 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
        * For example, because of a CREATE TABLE X AS statement.
        */
       object CreateTables extends Rule[LogicalPlan] {
    -    import org.apache.hadoop.hive.ql.Context
    -    import org.apache.hadoop.hive.ql.parse.{ASTNode, QB, SemanticAnalyzer}
    -
         def apply(plan: LogicalPlan): LogicalPlan = plan transform {
           // Wait until children are resolved.
           case p: LogicalPlan if !p.childrenResolved => p
    -
    -      case CreateTableAsSelect(desc, child, allowExisting) =>
    -        if (hive.convertCTAS && !desc.serde.isDefined) {
    -          // Do the conversion when spark.sql.hive.convertCTAS is true and 
the query
    -          // does not specify any storage format (file format and storage 
handler).
    -          if (desc.specifiedDatabase.isDefined) {
    -            throw new AnalysisException(
    -              "Cannot specify database name in a CTAS statement " +
    -              "when spark.sql.hive.convertCTAS is set to true.")
    -          }
    -
    -          val mode = if (allowExisting) SaveMode.Ignore else 
SaveMode.ErrorIfExists
    -          CreateTableUsingAsSelect(
    -            desc.name,
    -            conf.defaultDataSourceName,
    -            temporary = false,
    -            mode,
    -            options = Map.empty[String, String],
    -            child
    -          )
    +      case p: LogicalPlan if p.resolved => p
    +      case p @ CreateTableAsSelect(table, child, allowExisting) =>
    +        val schema = if (table.schema.size > 0) {
    +          table.schema
             } else {
    -          execution.CreateTableAsSelect(
    -            desc.copy(
    -              specifiedDatabase = 
Option(desc.specifiedDatabase.getOrElse(client.currentDatabase))),
    -            child,
    -            allowExisting)
    +          child.output.map {
    +            attr => new HiveColumn(
    +              attr.name,
    +              HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
    +          }
             }
     
    -      case p: LogicalPlan if p.resolved => p
    +        val desc = table.copy(schema = schema)
     
    -      case p @ CreateTableAsSelect(desc, child, allowExisting) =>
    -        val (dbName, tblName) = processDatabaseAndTableName(desc.database, 
desc.name)
    +        // This is a hack, we only take the RC, ORC and Parquet as 
specific storage
    +        // otherwise, we will convert it into Parquet2 when 
hive.convertCTAS specified
    +        val specificStorage = (table.inputFormat.map(format => {
    +          // org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat 
=> Parquet
    +          // org.apache.hadoop.hive.ql.io.orc.OrcInputFormat               
=> Orc
    +          // org.apache.hadoop.hive.ql.io.RCFileInputFormat                
=> RCFile
    +          // parquet.hive.DeprecatedParquetInputFormat                     
=> Parquet
    +          // TODO configurable?
    +          format.contains("Orc") || format.contains("Parquet") || 
format.contains("RCFile")
    +        }).getOrElse(false))
    --- End diff --
    
    Ok, I see, I will move the default `SerDe` from `HiveQl` to `Analyzer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to