Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12015#discussion_r57954149
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala 
---
    @@ -0,0 +1,442 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.hive.execution
    +
    +import scala.collection.JavaConverters._
    +
    +import org.antlr.v4.runtime.{ParserRuleContext, Token}
    +import org.apache.hadoop.hive.conf.HiveConf
    +import org.apache.hadoop.hive.conf.HiveConf.ConfVars
    +import org.apache.hadoop.hive.ql.exec.FunctionRegistry
    +import org.apache.hadoop.hive.ql.parse.EximUtil
    +import org.apache.hadoop.hive.ql.session.SessionState
    +import org.apache.hadoop.hive.serde.serdeConstants
    +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
    +
    +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, 
CatalogStorageFormat, CatalogTable, CatalogTableType}
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.parser.ng._
    +import org.apache.spark.sql.catalyst.parser.ng.SqlBaseParser._
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SparkSqlAstBuilder
    +import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, 
CreateViewAsSelect => CreateView}
    +import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveSerDe}
    +import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
    +
    +/**
    + * Concrete parser for HiveQl statements.
    + */
    +object HiveSqlParser extends AbstractSqlParser {
    +  val astBuilder = new HiveSqlAstBuilder
    +
    +  override protected def nativeCommand(sqlText: String): LogicalPlan = {
    +    HiveNativeCommand(sqlText)
    +  }
    +}
    +
    +/**
    + * Builder that converts an ANTLR ParseTree into a 
LogicalPlan/Expression/TableIdentifier.
    + */
    +class HiveSqlAstBuilder extends SparkSqlAstBuilder {
    +  import ParserUtils._
    +
    +  /**
    +   * Get the current Hive Configuration.
    +   */
    +  private[this] def hiveConf: HiveConf = {
    +    var ss = SessionState.get()
    +    // SessionState is lazy initialization, it can be null here
    +    if (ss == null) {
    +      val original = Thread.currentThread().getContextClassLoader
    +      val conf = new HiveConf(classOf[SessionState])
    +      conf.setClassLoader(original)
    +      ss = new SessionState(conf)
    +      SessionState.start(ss)
    +    }
    +    ss.getConf
    +  }
    +
    +  /**
    +   * Pass a command to Hive using a [[HiveNativeCommand]].
    +   */
    +  override def visitExecuteNativeCommand(
    +      ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
    +    HiveNativeCommand(command(ctx))
    +  }
    +
    +  /**
    +   * Fail an unsupported Hive native command.
    +   */
    +  override def visitFailNativeCommand(
    +      ctx: FailNativeCommandContext): LogicalPlan = withOrigin(ctx) {
    +    val keywords = if (ctx.kws != null) {
    +      Seq(ctx.kws.kw1, ctx.kws.kw2, ctx.kws.kw3).filter(_ != 
null).map(_.getText).mkString(" ")
    +    } else {
    +      // SET ROLE is the exception to the rule, because we handle this 
before other SET commands.
    +      "SET ROLE"
    +    }
    +    throw new ParseException(s"Unsupported operation: $keywords", ctx)
    +  }
    +
    +  /**
    +   * Create an [[AddJar]] or [[AddFile]] command depending on the 
requested resource.
    +   */
    +  override def visitAddResource(ctx: AddResourceContext): LogicalPlan = 
withOrigin(ctx) {
    +    ctx.identifier.getText.toLowerCase match {
    +      case "file" => AddFile(remainder(ctx.identifier).trim)
    +      case "jar" => AddJar(remainder(ctx.identifier).trim)
    +      case other => throw new ParseException(s"Unsupported resource type 
'$other'.", ctx)
    +    }
    +  }
    +
    +  /**
    +   * Create a [[DropTable]] command.
    +   */
    +  override def visitDropTable(ctx: DropTableContext): LogicalPlan = 
withOrigin(ctx) {
    +    if (ctx.PURGE != null) {
    +      logWarning("PURGE option is ignored.")
    +    }
    +    if (ctx.REPLICATION != null) {
    +      logWarning("REPLICATION clause is ignored.")
    +    }
    +    DropTable(visitTableIdentifier(ctx.tableIdentifier).toString, 
ctx.EXISTS != null)
    +  }
    +
    +  /**
    +   * Create an [[AnalyzeTable]] command. This currently only implements 
the NOSCAN option (other
    +   * options are passed on to Hive) e.g.:
    +   * {{{
    +   *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
    +   * }}}
    +   */
    +  override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
    +    if (ctx.partitionSpec == null &&
    +      ctx.identifier != null &&
    +      ctx.identifier.getText.toLowerCase == "noscan") {
    +      AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
    +    } else {
    +      HiveNativeCommand(command(ctx))
    +    }
    +  }
    +
    +  /**
    +   * Create a [[CreateTableAsSelect]] command.
    +   */
    +  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
    +    if (ctx.query == null) {
    +      HiveNativeCommand(command(ctx))
    +    } else {
    +      // Get the table header.
    +      val (table, temp, ifNotExists, external) = 
visitCreateTableHeader(ctx.createTableHeader)
    +      val tableType = if (external) {
    +        CatalogTableType.EXTERNAL_TABLE
    +      } else {
    +        CatalogTableType.MANAGED_TABLE
    +      }
    +
    +      // Unsupported clauses.
    +      if (temp) {
    +        logWarning("TEMPORARY clause is ignored.")
    +      }
    +      if (ctx.bucketSpec != null) {
    +        // TODO add this - we need cluster columns in the CatalogTable for 
this to work.
    +        logWarning("CLUSTERED BY ... [ORDERED BY ...] INTO ... BUCKETS 
clause is ignored.")
    +      }
    +      if (ctx.skewSpec != null) {
    +        logWarning("SKEWED BY ... ON ... [STORED AS DIRECTORIES] clause is 
ignored.")
    +      }
    +
    +      // Create the schema.
    +      val schema = 
Option(ctx.colTypeList).toSeq.flatMap(_.colType.asScala).map { col =>
    +        CatalogColumn(
    +          col.identifier.getText,
    +          col.dataType.getText.toLowerCase, // TODO validate this?
    +          nullable = true,
    +          Option(col.STRING).map(string))
    +      }
    +
    +      // Get the column by which the table is partitioned.
    +      val partitionCols = 
Option(ctx.identifierList).toSeq.flatMap(visitIdentifierList).map {
    +        CatalogColumn(_, null, nullable = true, None)
    +      }
    +
    +      // Create the storage.
    +      def format(fmt: ParserRuleContext): CatalogStorageFormat = {
    +        
Option(fmt).map(typedVisit[CatalogStorageFormat]).getOrElse(EmptyStorageFormat)
    +      }
    +      // Default storage.
    +      val defaultStorageType = 
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
    +      val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, 
hiveConf).getOrElse {
    +        HiveSerDe(
    +          inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
    +          outputFormat = 
Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
    +      }
    +      // Defined storage.
    +      val fileStorage = format(ctx.createFileFormat)
    +      val rowStorage = format(ctx.rowFormat)
    +      val storage = CatalogStorageFormat(
    +        Option(ctx.locationSpec).map(visitLocationSpec),
    +        fileStorage.inputFormat.orElse(hiveSerDe.inputFormat),
    +        fileStorage.outputFormat.orElse(hiveSerDe.outputFormat),
    +        rowStorage.serde.orElse(hiveSerDe.serde).orElse(fileStorage.serde),
    +        rowStorage.serdeProperties ++ fileStorage.serdeProperties
    +      )
    +
    +      val tableDesc = CatalogTable(
    +        identifier = table,
    +        tableType = tableType,
    +        schema = schema,
    +        partitionColumns = partitionCols,
    +        storage = storage,
    +        properties = 
Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
    +        // TODO support the sql text - have a proper location for this!
    +        viewText = Option(ctx.STRING).map(string))
    +      CTAS(tableDesc, plan(ctx.query), ifNotExists)
    +    }
    +  }
    +
    +  /**
    +   * Create or replace a view. This creates a [[CreateViewAsSelect]] 
command.
    +   */
    +  override def visitCreateView(ctx: CreateViewContext): LogicalPlan = 
withOrigin(ctx) {
    +    // Pass a partitioned view on to hive.
    +    if (ctx.identifierList != null) {
    +      HiveNativeCommand(command(ctx))
    +    } else {
    +      if (ctx.STRING != null) {
    +        logWarning("COMMENT clause is ignored.")
    +      }
    +      val identifiers = 
Option(ctx.identifierCommentList).toSeq.flatMap(_.identifierComment.asScala)
    +      val schema = identifiers.map { ic =>
    +        CatalogColumn(ic.identifier.getText, null, nullable = true, 
Option(ic.STRING).map(string))
    +      }
    +      createView(
    +        ctx,
    +        ctx.tableIdentifier,
    +        schema,
    +        ctx.query,
    +        
Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
    +        ctx.EXISTS != null,
    +        ctx.REPLACE != null
    +      )
    +    }
    +  }
    +
    +  /**
    +   * Alter the query of a view. This creates a [[CreateViewAsSelect]] 
command.
    +   */
    +  override def visitAlterViewQuery(ctx: AlterViewQueryContext): 
LogicalPlan = withOrigin(ctx) {
    +    createView(
    +      ctx,
    +      ctx.tableIdentifier,
    +      Seq.empty,
    +      ctx.query,
    +      Map.empty,
    +      allowExist = false,
    +      replace = true)
    +  }
    +
    +  /**
    +   * Create a [[CreateViewAsSelect]] command.
    +   */
    +  private def createView(
    +      ctx: ParserRuleContext,
    +      name: TableIdentifierContext,
    +      schema: Seq[CatalogColumn],
    +      query: QueryContext,
    +      properties: Map[String, String],
    +      allowExist: Boolean,
    +      replace: Boolean): LogicalPlan = {
    +    val sql = Option(source(query))
    +    val tableDesc = CatalogTable(
    +      identifier = visitTableIdentifier(name),
    +      tableType = CatalogTableType.VIRTUAL_VIEW,
    +      schema = schema,
    +      storage = EmptyStorageFormat,
    +      properties = properties,
    +      viewOriginalText = sql,
    +      viewText = sql)
    +    CreateView(tableDesc, plan(query), allowExist, replace, command(ctx))
    --- End diff --
    
    NVM, `CreateViewAsSelect => CreateView`. I found it. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to