Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16621#discussion_r96577427
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
    @@ -215,37 +215,43 @@ case class DataSourceAnalysis(conf: CatalystConf) 
extends Rule[LogicalPlan] {
     
     
     /**
    - * Replaces [[SimpleCatalogRelation]] with data source table if its table 
property contains data
    - * source information.
    + * Replaces [[SimpleCatalogRelation]] with data source table if its table 
provider is not hive.
      */
     class FindDataSourceTable(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
    -  private def readDataSourceTable(
    -      sparkSession: SparkSession,
    -      simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
    -    val table = simpleCatalogRelation.catalogTable
    -    val pathOption = table.storage.locationUri.map("path" -> _)
    -    val dataSource =
    -      DataSource(
    -        sparkSession,
    -        userSpecifiedSchema = Some(table.schema),
    -        partitionColumns = table.partitionColumnNames,
    -        bucketSpec = table.bucketSpec,
    -        className = table.provider.get,
    -        options = table.storage.properties ++ pathOption)
    -
    -    LogicalRelation(
    -      dataSource.resolveRelation(),
    -      expectedOutputAttributes = Some(simpleCatalogRelation.output),
    -      catalogTable = Some(table))
    +  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
    +    val qualifiedTableName = QualifiedTableName(table.database, 
table.identifier.table)
    +    val cache = sparkSession.sessionState.catalog.tableRelationCache
    +    val withHiveSupport =
    +      
sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == 
"hive"
    +
    +    cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
    +      override def call(): LogicalPlan = {
    +        val pathOption = table.storage.locationUri.map("path" -> _)
    +        val dataSource =
    +          DataSource(
    +            sparkSession,
    +            // In older version(prior to 2.1) of Spark, the table schema 
can be empty and should be
    +            // inferred at runtime. We should still support it.
    +            userSpecifiedSchema = if (table.schema.isEmpty) None else 
Some(table.schema),
    +            partitionColumns = table.partitionColumnNames,
    +            bucketSpec = table.bucketSpec,
    +            className = table.provider.get,
    +            options = table.storage.properties ++ pathOption,
    +            // TODO: improve `InMemoryCatalog` and remove this limitation.
    +            catalogTable = if (withHiveSupport) Some(table) else None)
    +
    +        LogicalRelation(dataSource.resolveRelation(), catalogTable = 
Some(table))
    --- End diff --
    
    Note that, previously we will set `expectedOutputAttributes` here, which 
was added by https://github.com/apache/spark/pull/15182
    
    However, this doesn't work when the table schema needs to be inferred at 
runtime, and it turns out that we don't need to do it at all. 
`AnalyzeColumnCommand` now gets attributes from the [resolved table relation 
plan](https://github.com/apache/spark/pull/16621/files#diff-027d6bd7c8cf4f64f99acc058389d859R44)
 , so it's fine for rule `FindDataSourceTable` to change outputs during 
analysis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to