wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r316534207
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
##########
@@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends
StatisticsCollectionTestBase with Shared
}
}
}
+
+ test("Non-partitioned data source table support fallback to HDFS for size
estimation") {
+ withTempDir { dir =>
+ Seq(false, true).foreach { fallBackToHDFS =>
+ withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key ->
s"$fallBackToHDFS") {
+ withTable("spark_25474") {
+ sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+ assert(getCatalogTable("spark_25474").stats.isEmpty)
+ val relation =
spark.table("spark_25474").queryExecution.analyzed.children.head
+ // Table statistics are always recalculated by FileIndex
+ assert(relation.stats.sizeInBytes === getDataSize(dir))
+ }
+ }
+ }
+ }
+ }
+
+ test("Partitioned data source table support fallback to HDFS for size
estimation") {
+ Seq(false, true).foreach { fallBackToHDFS =>
+ withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key ->
s"$fallBackToHDFS") {
+ withTempDir { dir =>
+ withTable("spark_25474") {
+ sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+ s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+ sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+ assert(getCatalogTable("spark_25474").stats.isEmpty)
+ val relation =
spark.table("spark_25474").queryExecution.analyzed.children.head
+ if (fallBackToHDFS) {
+ assert(relation.stats.sizeInBytes ===
+ CommandUtils.getSizeInBytesFallBackToHdfs(spark,
getCatalogTable("spark_25474")))
+ } else {
+ assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("Partitioned data source table support fallback to HDFS for size
estimation" +
+ "with defaultSizeInBytes") {
+ val defaultSizeInBytes = 10 * 1024 * 1024
+ Seq(false, true).foreach { fallBackToHDFS =>
+ withSQLConf(
+ SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS",
+ SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") {
+ withTempDir { dir =>
+ withTable("spark_25474") {
+ sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+ s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+ sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+ assert(getCatalogTable("spark_25474").stats.isEmpty)
+ val relation =
spark.table("spark_25474").queryExecution.analyzed.children.head
+ if (fallBackToHDFS) {
+ assert(relation.stats.sizeInBytes ===
+ CommandUtils.getSizeInBytesFallBackToHdfs(spark,
getCatalogTable("spark_25474")))
+ } else {
+ assert(relation.stats.sizeInBytes === defaultSizeInBytes)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("Partitioned data source table stats should be cached") {
+ Seq(false, true).foreach { fallBackToHDFS =>
+ withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key ->
s"$fallBackToHDFS") {
+ withTempDir { dir =>
+ withTable("spark_25474") {
+ sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+ s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+ sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+ assert(getCatalogTable("spark_25474").stats.isEmpty)
+ val relation =
spark.table("spark_25474").queryExecution.analyzed.children.head
+ if (fallBackToHDFS) {
+ val dataSize =
+ CommandUtils.getSizeInBytesFallBackToHdfs(spark,
getCatalogTable("spark_25474"))
+ assert(relation.stats.sizeInBytes === dataSize)
+
+ val qualifiedTableName =
+
QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474")
+ val logicalRelation =
spark.sessionState.catalog.getCachedTable(qualifiedTableName)
+ .asInstanceOf[LogicalRelation]
+ assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes
=== dataSize)
+ } else {
+ assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("External partitioned data source table does not support fallback to
HDFS " +
+ "for size estimation") {
+ Seq(true).foreach { fallBackToHDFS =>
+ withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key ->
s"$fallBackToHDFS") {
+ withTempDir { dir =>
+ withTable("spark_25474") {
+ sql("CREATE TABLE spark_25474(a bigint, b bigint) USING parquet " +
+ s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+
+ withTempDir { partitionDir =>
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(partitionDir.getCanonicalPath)
+ sql(s"ALTER TABLE spark_25474 ADD PARTITION (a=1) LOCATION
'$partitionDir'")
+ assert(getCatalogTable("spark_25474").stats.isEmpty)
+ val relation =
spark.table("spark_25474").queryExecution.analyzed.children.head
+ assert(spark.table("spark_25474").count() === 5)
+ if (fallBackToHDFS) {
+ assert(relation.stats.sizeInBytes === 0)
Review comment:
Yes. Non-partitioned data source table will get the correct statistics if
table statistics are not available.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]