Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16233#discussion_r93898349
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -510,32 +510,94 @@ class Analyzer(
        * Replaces [[UnresolvedRelation]]s with concrete relations from the 
catalog.
        */
       object ResolveRelations extends Rule[LogicalPlan] {
    -    private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan 
= {
    +
    +    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    +      case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
    +        i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
    +      case u: UnresolvedRelation => resolveRelation(u)
    +    }
    +
    +    // If the unresolved relation is running directly on files, we just 
return the original
    +    // UnresolvedRelation, the plan will get resolved later. Else we look 
up the table from catalog
    +    // and change the default database name if it is a view.
    +    //
    +    // Note this is compatible with the views defined by older versions of 
Spark(before 2.2), which
    +    // have empty defaultDatabase and all the relations in viewText have 
database part defined.
    +    def resolveRelation(
    +        plan: LogicalPlan,
    +        defaultDatabase: Option[String] = None): LogicalPlan = plan match {
    +      case u @ UnresolvedRelation(table: TableIdentifier, _) if 
isRunningDirectlyOnFiles(table) =>
    +        u
    +      case u: UnresolvedRelation =>
    +        resolveView(lookupTableFromCatalog(u, defaultDatabase))
    +    }
    +
    +    // Look up the table with the given name from catalog. If 
`defaultDatabase` is set, we look up
    +    // the table in the database `defaultDatabase`, else we follow the 
default way.
    +    private def lookupTableFromCatalog(
    +        u: UnresolvedRelation,
    +        defaultDatabase: Option[String] = None): LogicalPlan = {
           try {
    -        catalog.lookupRelation(u.tableIdentifier, u.alias)
    +        catalog.lookupRelation(u.tableIdentifier, u.alias, defaultDatabase)
           } catch {
             case _: NoSuchTableException =>
               u.failAnalysis(s"Table or view not found: ${u.tableName}")
           }
         }
     
    -    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    -      case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
    -        i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
    -      case u: UnresolvedRelation =>
    -        val table = u.tableIdentifier
    -        if (table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
    -            (!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))) {
    -          // If the database part is specified, and we support running SQL 
directly on files, and
    -          // it's not a temporary view, and the table does not exist, then 
let's just return the
    -          // original UnresolvedRelation. It is possible we are matching a 
query like "select *
    -          // from parquet.`/path/to/query`". The plan will get resolved 
later.
    -          // Note that we are testing (!db_exists || !table_exists) 
because the catalog throws
    -          // an exception from tableExists if the database does not exist.
    -          u
    -        } else {
    -          lookupTableFromCatalog(u)
    +    // If the database part is specified, and we support running SQL 
directly on files, and
    +    // it's not a temporary view, and the table does not exist, then let's 
just return the
    +    // original UnresolvedRelation. It is possible we are matching a query 
like "select *
    +    // from parquet.`/path/to/query`". The plan will get resolved later.
    +    // Note that we are testing (!db_exists || !table_exists) because the 
catalog throws
    +    // an exception from tableExists if the database does not exist.
    +    private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean 
= {
    +      table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
    +        (!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))
    +    }
    +
    +    // Change the default database name if the plan is a view, and 
transformDown with the new
    +    // database name to resolve all UnresolvedRelations and Views.
    +    // If the view is defined in a DataSource other than Hive, and the 
view's child is empty,
    +    // set the view's child to a SimpleCatalogRelation, else throw an 
AnalysisException.
    +    def resolveView(plan: LogicalPlan): LogicalPlan = plan match {
    +      case view: View =>
    +        val desc = view.desc
    +        val defaultDatabase = desc.viewDefaultDatabase
    +        val unresolvedChild = view.child.getOrElse {
    +          if (isDatasourceTable(desc)) {
    +            SimpleCatalogRelation(lookupDatabaseName(desc), desc)
    +          } else {
    +            throw new AnalysisException(s"The child of view 
'${desc.identifier}' is not defined.")
    +          }
             }
    +        // Resolve all the UnresolvedRelations and Views in the child.
    +        val newChild = unresolvedChild transform {
    +          case v: View if !v.resolved =>
    +            resolveView(v)
    +          case u: UnresolvedRelation =>
    +            resolveRelation(u, defaultDatabase)
    +        }
    +        view.copy(child = Some(newChild))
    +      case p @ SubqueryAlias(_, view: View, _) =>
    +        val newChild = resolveView(view)
    +        p.copy(child = newChild)
    +      case _ => plan
    +    }
    +
    +    // If the database part is defined in the table identifer of the view 
description, return that
    +    // database name, else first attempt to return the view default 
database name of the view
    +    // desc, if that does not exist, return the current database of the 
catalog.
    +    private def lookupDatabaseName(desc: CatalogTable): String = {
    --- End diff --
    
    Since it is used only once, should we inline this function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to