wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r316570564
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
##########
@@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends
StatisticsCollectionTestBase with Shared
}
}
}
+
+ test("Non-partitioned data source table support fallback to HDFS for size
estimation") {
+ withTempDir { dir =>
+ Seq(false, true).foreach { fallBackToHDFS =>
+ withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key ->
s"$fallBackToHDFS") {
+ withTable("spark_25474") {
+ sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+ assert(getCatalogTable("spark_25474").stats.isEmpty)
+ val relation =
spark.table("spark_25474").queryExecution.analyzed.children.head
+ // Table statistics are always recalculated by FileIndex
+ assert(relation.stats.sizeInBytes === getDataSize(dir))
+ }
+ }
+ }
+ }
+ }
+
+ test("Partitioned data source table support fallback to HDFS for size
estimation") {
+ Seq(false, true).foreach { fallBackToHDFS =>
+ withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key ->
s"$fallBackToHDFS") {
+ withTempDir { dir =>
+ withTable("spark_25474") {
+ sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+ s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+ sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+ assert(getCatalogTable("spark_25474").stats.isEmpty)
+ val relation =
spark.table("spark_25474").queryExecution.analyzed.children.head
+ if (fallBackToHDFS) {
+ assert(relation.stats.sizeInBytes ===
+ CommandUtils.getSizeInBytesFallBackToHdfs(spark,
getCatalogTable("spark_25474")))
+ } else {
+ assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("Partitioned data source table support fallback to HDFS for size
estimation" +
+ "with defaultSizeInBytes") {
+ val defaultSizeInBytes = 10 * 1024 * 1024
+ Seq(false, true).foreach { fallBackToHDFS =>
+ withSQLConf(
+ SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS",
+ SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") {
+ withTempDir { dir =>
+ withTable("spark_25474") {
+ sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+ s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+ sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+ assert(getCatalogTable("spark_25474").stats.isEmpty)
+ val relation =
spark.table("spark_25474").queryExecution.analyzed.children.head
+ if (fallBackToHDFS) {
+ assert(relation.stats.sizeInBytes ===
+ CommandUtils.getSizeInBytesFallBackToHdfs(spark,
getCatalogTable("spark_25474")))
+ } else {
+ assert(relation.stats.sizeInBytes === defaultSizeInBytes)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("Partitioned data source table stats should be cached") {
+ Seq(false, true).foreach { fallBackToHDFS =>
+ withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key ->
s"$fallBackToHDFS") {
+ withTempDir { dir =>
+ withTable("spark_25474") {
+ sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+ s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+ sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+ assert(getCatalogTable("spark_25474").stats.isEmpty)
+ val relation =
spark.table("spark_25474").queryExecution.analyzed.children.head
+ if (fallBackToHDFS) {
+ val dataSize =
+ CommandUtils.getSizeInBytesFallBackToHdfs(spark,
getCatalogTable("spark_25474"))
+ assert(relation.stats.sizeInBytes === dataSize)
+
+ val qualifiedTableName =
+
QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474")
+ val logicalRelation =
spark.sessionState.catalog.getCachedTable(qualifiedTableName)
+ .asInstanceOf[LogicalRelation]
+ assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes
=== dataSize)
+ } else {
+ assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("External partitioned data source table does not support fallback to
HDFS " +
+ "for size estimation") {
+ Seq(true).foreach { fallBackToHDFS =>
+ withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key ->
s"$fallBackToHDFS") {
+ withTempDir { dir =>
+ withTable("spark_25474") {
+ sql("CREATE TABLE spark_25474(a bigint, b bigint) USING parquet " +
+ s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+
+ withTempDir { partitionDir =>
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(partitionDir.getCanonicalPath)
+ sql(s"ALTER TABLE spark_25474 ADD PARTITION (a=1) LOCATION
'$partitionDir'")
+ assert(getCatalogTable("spark_25474").stats.isEmpty)
+ val relation =
spark.table("spark_25474").queryExecution.analyzed.children.head
+ assert(spark.table("spark_25474").count() === 5)
+ if (fallBackToHDFS) {
+ assert(relation.stats.sizeInBytes === 0)
Review comment:
@dongjoon-hyun It is expensive to support partitioned table with external
partitions. Please see [this test
case](https://github.com/apache/spark/pull/24715/files#diff-8c27508821958acbe016862c9ab2f25fR770).
Its data size is incorrect.
Related discussion:
https://github.com/apache/spark/pull/24715#discussion_r316143787
https://github.com/apache/spark/pull/24715#issuecomment-522414331
So we plan do not fallback to HDFS size for partitioned tables.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]