wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r316534257
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##########
 @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
       }
     }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+    withTempDir { dir =>
+      Seq(false, true).foreach { fallBackToHDFS =>
+        withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+          withTable("spark_25474") {
+            sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+            
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+            assert(getCatalogTable("spark_25474").stats.isEmpty)
+            val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+            // Table statistics are always recalculated by FileIndex
+            assert(relation.stats.sizeInBytes === getDataSize(dir))
+          }
+        }
+      }
+    }
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation") {
+    Seq(false, true).foreach { fallBackToHDFS =>
+      withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+        withTempDir { dir =>
+          withTable("spark_25474") {
+            sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+                s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+            sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+            assert(getCatalogTable("spark_25474").stats.isEmpty)
+            val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+            if (fallBackToHDFS) {
+              assert(relation.stats.sizeInBytes ===
+                CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+            } else {
+              assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+            }
+          }
+        }
+      }
+    }
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation" +
+    "with defaultSizeInBytes") {
+    val defaultSizeInBytes = 10 * 1024 * 1024
+    Seq(false, true).foreach { fallBackToHDFS =>
+      withSQLConf(
+        SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS",
+        SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") {
+        withTempDir { dir =>
+          withTable("spark_25474") {
+            sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+              s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+            sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+            assert(getCatalogTable("spark_25474").stats.isEmpty)
+            val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+            if (fallBackToHDFS) {
+              assert(relation.stats.sizeInBytes ===
+                CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+            } else {
+              assert(relation.stats.sizeInBytes === defaultSizeInBytes)
+            }
+          }
+        }
+      }
+    }
+  }
+
+  test("Partitioned data source table stats should be cached") {
+    Seq(false, true).foreach { fallBackToHDFS =>
+      withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+        withTempDir { dir =>
+          withTable("spark_25474") {
+            sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+              s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+            sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+            assert(getCatalogTable("spark_25474").stats.isEmpty)
+            val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+            if (fallBackToHDFS) {
+              val dataSize =
+                CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474"))
+              assert(relation.stats.sizeInBytes === dataSize)
+
+              val qualifiedTableName =
+                
QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474")
+              val logicalRelation = 
spark.sessionState.catalog.getCachedTable(qualifiedTableName)
+                .asInstanceOf[LogicalRelation]
+              assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes 
=== dataSize)
+            } else {
+              assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+            }
+          }
+        }
+      }
+    }
+  }
+
+  test("External partitioned data source table does not support fallback to 
HDFS " +
+    "for size estimation") {
+    Seq(true).foreach { fallBackToHDFS =>
+      withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+        withTempDir { dir =>
+          withTable("spark_25474") {
+            sql("CREATE TABLE spark_25474(a bigint, b bigint) USING parquet " +
+              s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+
+            withTempDir { partitionDir =>
+              
spark.range(5).write.mode(SaveMode.Overwrite).parquet(partitionDir.getCanonicalPath)
+              sql(s"ALTER TABLE spark_25474 ADD PARTITION (a=1) LOCATION 
'$partitionDir'")
+              assert(getCatalogTable("spark_25474").stats.isEmpty)
+              val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+              assert(spark.table("spark_25474").count() === 5)
+              if (fallBackToHDFS) {
+                assert(relation.stats.sizeInBytes === 0)
 
 Review comment:
   Could we just revert 
https://github.com/apache/spark/commit/485ae6d1818e8756a86da38d6aefc8f1dbde49c2 
?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to