Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/16621#discussion_r96577427
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
---
@@ -215,37 +215,43 @@ case class DataSourceAnalysis(conf: CatalystConf)
extends Rule[LogicalPlan] {
/**
- * Replaces [[SimpleCatalogRelation]] with data source table if its table
property contains data
- * source information.
+ * Replaces [[SimpleCatalogRelation]] with data source table if its table
provider is not hive.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
- private def readDataSourceTable(
- sparkSession: SparkSession,
- simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
- val table = simpleCatalogRelation.catalogTable
- val pathOption = table.storage.locationUri.map("path" -> _)
- val dataSource =
- DataSource(
- sparkSession,
- userSpecifiedSchema = Some(table.schema),
- partitionColumns = table.partitionColumnNames,
- bucketSpec = table.bucketSpec,
- className = table.provider.get,
- options = table.storage.properties ++ pathOption)
-
- LogicalRelation(
- dataSource.resolveRelation(),
- expectedOutputAttributes = Some(simpleCatalogRelation.output),
- catalogTable = Some(table))
+ private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
+ val qualifiedTableName = QualifiedTableName(table.database,
table.identifier.table)
+ val cache = sparkSession.sessionState.catalog.tableRelationCache
+ val withHiveSupport =
+
sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) ==
"hive"
+
+ cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
+ override def call(): LogicalPlan = {
+ val pathOption = table.storage.locationUri.map("path" -> _)
+ val dataSource =
+ DataSource(
+ sparkSession,
+ // In older version(prior to 2.1) of Spark, the table schema
can be empty and should be
+ // inferred at runtime. We should still support it.
+ userSpecifiedSchema = if (table.schema.isEmpty) None else
Some(table.schema),
+ partitionColumns = table.partitionColumnNames,
+ bucketSpec = table.bucketSpec,
+ className = table.provider.get,
+ options = table.storage.properties ++ pathOption,
+ // TODO: improve `InMemoryCatalog` and remove this limitation.
+ catalogTable = if (withHiveSupport) Some(table) else None)
+
+ LogicalRelation(dataSource.resolveRelation(), catalogTable =
Some(table))
--- End diff --
Note that, previously we will set `expectedOutputAttributes` here, which
was added by https://github.com/apache/spark/pull/15182
However, this doesn't work when the table schema needs to be inferred at
runtime, and it turns out that we don't need to do it at all.
`AnalyzeColumnCommand` now gets attributes from the [resolved table relation
plan](https://github.com/apache/spark/pull/16621/files#diff-027d6bd7c8cf4f64f99acc058389d859R44)
, so it's fine for rule `FindDataSourceTable` to change outputs during
analysis.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]