wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r315993569
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##########
 @@ -220,10 +220,20 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
  * data source.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
-  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
-    val qualifiedTableName = QualifiedTableName(table.database, 
table.identifier.table)
+  private def maybeWithTableStats(tableMeta: CatalogTable): CatalogTable = {
+    if (tableMeta.stats.isEmpty && 
sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+      val sizeInBytes = 
CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, tableMeta)
+      tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes))))
+    } else {
+      tableMeta
+    }
+  }
+
+  private def readDataSourceTable(tableMeta: CatalogTable): LogicalPlan = {
+    val qualifiedTableName = QualifiedTableName(tableMeta.database, 
tableMeta.identifier.table)
     val catalog = sparkSession.sessionState.catalog
     catalog.getCachedPlan(qualifiedTableName, () => {
+      val table = maybeWithTableStats(tableMeta)
 
 Review comment:
   Verify it always uses cached relation if it cached. Checkout this 
[commit](https://github.com/apache/spark/pull/24715/commits/1fce50859b959cd4190f0da5dabf4addd187fb79):
   ```shell
   git checkout 1fce50859b959cd4190f0da5dabf4addd187fb79
   build/sbt clean package
   export SPARK_PREPEND_CLASSES=true
   ./bin/spark-shell
   ```
   
   ```scala
   import org.apache.spark.sql.catalyst.QualifiedTableName
   import org.apache.spark.sql.execution.datasources.LogicalRelation
   
   spark.sql("CREATE TABLE t3(id int, c2 int) USING parquet PARTITIONED BY(id)")
   spark.sql("INSERT INTO TABLE t3 PARTITION(id=1) SELECT 2")
   
   // default, fallBackToHdfs=false
   spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
   
spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase,
 "t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats
   
   // enable fallBackToHdfs
   spark.sql("set spark.sql.statistics.fallBackToHdfs=true")
   spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
   
spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase,
 "t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats
   
   // Invalidate cached relations
   spark.sessionState.catalog.invalidateAllCachedTables
   spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
   ```
   
   ```scala
   scala> import org.apache.spark.sql.catalyst.QualifiedTableName
   import org.apache.spark.sql.catalyst.QualifiedTableName
   
   scala> import org.apache.spark.sql.execution.datasources.LogicalRelation
   import org.apache.spark.sql.execution.datasources.LogicalRelation
   
   scala>
   
   scala> spark.sql("CREATE TABLE t3(id int, c2 int) USING parquet PARTITIONED 
BY(id)")
   res0: org.apache.spark.sql.DataFrame = []
   
   scala> spark.sql("INSERT INTO TABLE t3 PARTITION(id=1) SELECT 2")
   res1: org.apache.spark.sql.DataFrame = []
   
   scala>
   
   scala> // default, fallBackToHdfs=false
   
   scala> spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
   
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   |plan                                                                        
                                                                                
                                                                                
                                                                                
                                                        |
   
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   |== Optimized Logical Plan ==
   Relation[c2#1,id#2] parquet, Statistics(sizeInBytes=8.0 EiB)
   
   == Physical Plan ==
   *(1) ColumnarToRow
   +- FileScan parquet default.t3[c2#1,id#2] Batched: true, DataFilters: [], 
Format: Parquet, Location: 
CatalogFileIndex[file:/root/opensource/SPARK-25474/spark-warehouse/t3], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c2:int>
   
   |
   
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   
   
   scala> 
spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase,
 "t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats
   res3: Option[org.apache.spark.sql.catalyst.catalog.CatalogStatistics] = None
   
   scala>
   
   scala> // enable fallBackToHdfs
   
   scala> spark.sql("set spark.sql.statistics.fallBackToHdfs=true")
   res4: org.apache.spark.sql.DataFrame = [key: string, value: string]
   
   scala> spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
   
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   |plan                                                                        
                                                                                
                                                                                
                                                                                
                                                        |
   
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   |== Optimized Logical Plan ==
   Relation[c2#1,id#2] parquet, Statistics(sizeInBytes=8.0 EiB)
   
   == Physical Plan ==
   *(1) ColumnarToRow
   +- FileScan parquet default.t3[c2#1,id#2] Batched: true, DataFilters: [], 
Format: Parquet, Location: 
CatalogFileIndex[file:/root/opensource/SPARK-25474/spark-warehouse/t3], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c2:int>
   
   |
   
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   
   
   scala> 
spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase,
 "t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats
   res6: Option[org.apache.spark.sql.catalyst.catalog.CatalogStatistics] = None
   
   scala>
   
   scala> // Invalidate cached relations
   
   scala> spark.sessionState.catalog.invalidateAllCachedTables
   
   scala> spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
   
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   |plan                                                                        
                                                                                
                                                                                
                                                                                
                                                            |
   
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   |== Optimized Logical Plan ==
   Relation[c2#42,id#43] parquet, Statistics(sizeInBytes=421.0 B)
   
   == Physical Plan ==
   *(1) ColumnarToRow
   +- FileScan parquet default.t3[c2#42,id#43] Batched: true, DataFilters: [], 
Format: Parquet, Location: 
CatalogFileIndex[file:/root/opensource/SPARK-25474/spark-warehouse/t3], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c2:int>
   
   |
   
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to