wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r315993569
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -220,10 +220,20 @@ case class DataSourceAnalysis(conf: SQLConf) extends
Rule[LogicalPlan] with Cast
* data source.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
- private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
- val qualifiedTableName = QualifiedTableName(table.database,
table.identifier.table)
+ private def maybeWithTableStats(tableMeta: CatalogTable): CatalogTable = {
+ if (tableMeta.stats.isEmpty &&
sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ val sizeInBytes =
CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, tableMeta)
+ tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes =
BigInt(sizeInBytes))))
+ } else {
+ tableMeta
+ }
+ }
+
+ private def readDataSourceTable(tableMeta: CatalogTable): LogicalPlan = {
+ val qualifiedTableName = QualifiedTableName(tableMeta.database,
tableMeta.identifier.table)
val catalog = sparkSession.sessionState.catalog
catalog.getCachedPlan(qualifiedTableName, () => {
+ val table = maybeWithTableStats(tableMeta)
Review comment:
Verify it always uses cached relation if it cached. Checkout this
[commit](https://github.com/apache/spark/pull/24715/commits/1fce50859b959cd4190f0da5dabf4addd187fb79):
```shell
git checkout 1fce50859b959cd4190f0da5dabf4addd187fb79
build/sbt clean package
export SPARK_PREPEND_CLASSES=true
./bin/spark-shell
```
```scala
import org.apache.spark.sql.catalyst.QualifiedTableName
import org.apache.spark.sql.execution.datasources.LogicalRelation
spark.sql("CREATE TABLE t3(id int, c2 int) USING parquet PARTITIONED BY(id)")
spark.sql("INSERT INTO TABLE t3 PARTITION(id=1) SELECT 2")
// default, fallBackToHdfs=false
spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase,
"t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats
// enable fallBackToHdfs
spark.sql("set spark.sql.statistics.fallBackToHdfs=true")
spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase,
"t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats
// Invalidate cached relations
spark.sessionState.catalog.invalidateAllCachedTables
spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
```
```scala
scala> import org.apache.spark.sql.catalyst.QualifiedTableName
import org.apache.spark.sql.catalyst.QualifiedTableName
scala> import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
scala>
scala> spark.sql("CREATE TABLE t3(id int, c2 int) USING parquet PARTITIONED
BY(id)")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("INSERT INTO TABLE t3 PARTITION(id=1) SELECT 2")
res1: org.apache.spark.sql.DataFrame = []
scala>
scala> // default, fallBackToHdfs=false
scala> spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan
|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|== Optimized Logical Plan ==
Relation[c2#1,id#2] parquet, Statistics(sizeInBytes=8.0 EiB)
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet default.t3[c2#1,id#2] Batched: true, DataFilters: [],
Format: Parquet, Location:
CatalogFileIndex[file:/root/opensource/SPARK-25474/spark-warehouse/t3],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c2:int>
|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
scala>
spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase,
"t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats
res3: Option[org.apache.spark.sql.catalyst.catalog.CatalogStatistics] = None
scala>
scala> // enable fallBackToHdfs
scala> spark.sql("set spark.sql.statistics.fallBackToHdfs=true")
res4: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan
|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|== Optimized Logical Plan ==
Relation[c2#1,id#2] parquet, Statistics(sizeInBytes=8.0 EiB)
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet default.t3[c2#1,id#2] Batched: true, DataFilters: [],
Format: Parquet, Location:
CatalogFileIndex[file:/root/opensource/SPARK-25474/spark-warehouse/t3],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c2:int>
|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
scala>
spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase,
"t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats
res6: Option[org.apache.spark.sql.catalyst.catalog.CatalogStatistics] = None
scala>
scala> // Invalidate cached relations
scala> spark.sessionState.catalog.invalidateAllCachedTables
scala> spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan
|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|== Optimized Logical Plan ==
Relation[c2#42,id#43] parquet, Statistics(sizeInBytes=421.0 B)
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet default.t3[c2#42,id#43] Batched: true, DataFilters: [],
Format: Parquet, Location:
CatalogFileIndex[file:/root/opensource/SPARK-25474/spark-warehouse/t3],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c2:int>
|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]