[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r317194291 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -1216,8 +1216,12 @@ object SQLConf { .createWithDefault(true) val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = buildConf("spark.sql.statistics.fallBackToHdfs") -.doc("If the table statistics are not available from table metadata enable fall back to hdfs." + - " This is useful in determining if a table is small enough to use auto broadcast joins.") +.doc("This flag is effective only if it is Hive table. When true, it will fall back to HDFS " + + "if the table statistics are not available from table metadata. This is useful in " + + "determining if a table is small enough to use auto broadcast joins. " + + "For non-partitioned data source table, it will be automatically recalculated if table " + + "statistics are not available. For partitioned data source table, It is " + + s"'${DEFAULT_SIZE_IN_BYTES.key}' if table statistics are not available.") .booleanConf Review comment: cc @shahidki31 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r316570564 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// Table statistics are always recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + +s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation" + +"with defaultSizeInBytes") { +val defaultSizeInBytes = 10 * 1024 * 1024 +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf( +SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS", +SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table stats should be cached") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + val dataSize = +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474")) + assert(relation.stats.sizeInBytes === dataSize) + + val qualifiedTableName = + QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474") + val logicalRelation = spark.sessionState.catalog.getCachedTable(qualifiedTableName) +.asInstanceOf[LogicalRelation] + assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes === dataSize) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("External partitioned data source table does not support fallback to HDFS " + +
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r316544252 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// Table statistics are always recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + +s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation" + +"with defaultSizeInBytes") { +val defaultSizeInBytes = 10 * 1024 * 1024 +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf( +SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS", +SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table stats should be cached") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + val dataSize = +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474")) + assert(relation.stats.sizeInBytes === dataSize) + + val qualifiedTableName = + QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474") + val logicalRelation = spark.sessionState.catalog.getCachedTable(qualifiedTableName) +.asInstanceOf[LogicalRelation] + assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes === dataSize) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("External partitioned data source table does not support fallback to HDFS " + +
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r316534257 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// Table statistics are always recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + +s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation" + +"with defaultSizeInBytes") { +val defaultSizeInBytes = 10 * 1024 * 1024 +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf( +SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS", +SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table stats should be cached") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + val dataSize = +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474")) + assert(relation.stats.sizeInBytes === dataSize) + + val qualifiedTableName = + QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474") + val logicalRelation = spark.sessionState.catalog.getCachedTable(qualifiedTableName) +.asInstanceOf[LogicalRelation] + assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes === dataSize) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("External partitioned data source table does not support fallback to HDFS " + +
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r316534207 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// Table statistics are always recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + +s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation" + +"with defaultSizeInBytes") { +val defaultSizeInBytes = 10 * 1024 * 1024 +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf( +SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS", +SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table stats should be cached") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + val dataSize = +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474")) + assert(relation.stats.sizeInBytes === dataSize) + + val qualifiedTableName = + QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474") + val logicalRelation = spark.sessionState.catalog.getCachedTable(qualifiedTableName) +.asInstanceOf[LogicalRelation] + assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes === dataSize) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("External partitioned data source table does not support fallback to HDFS " + +
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r316479433 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// Table statistics are always recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + +s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation" + +"with defaultSizeInBytes") { +val defaultSizeInBytes = 10 * 1024 * 1024 +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf( +SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS", +SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table stats should be cached") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + val dataSize = +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474")) + assert(relation.stats.sizeInBytes === dataSize) + + val qualifiedTableName = + QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474") + val logicalRelation = spark.sessionState.catalog.getCachedTable(qualifiedTableName) +.asInstanceOf[LogicalRelation] + assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes === dataSize) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("External partitioned data source table does not support fallback to HDFS " + +
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r316218631 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// Table statistics are always recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + +s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation" + +"with defaultSizeInBytes") { +val defaultSizeInBytes = 10 * 1024 * 1024 +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf( +SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS", +SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table stats should be cached") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + val dataSize = +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474")) + assert(relation.stats.sizeInBytes === dataSize) + + val qualifiedTableName = + QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474") + val logicalRelation = spark.sessionState.catalog.getCachedTable(qualifiedTableName) +.asInstanceOf[LogicalRelation] + assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes === dataSize) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("External partitioned data source table does not support fallback to HDFS " + +
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r316115363 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// Table statistics are always recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + +s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation" + +"with defaultSizeInBytes") { +val defaultSizeInBytes = 10 * 1024 * 1024 +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf( +SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS", +SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) +} else { + assert(relation.stats.sizeInBytes === defaultSizeInBytes) +} + } +} + } +} + } + + test("Partitioned data source table stats should be cached") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + val dataSize = +CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474")) + assert(relation.stats.sizeInBytes === dataSize) + + val qualifiedTableName = + QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474") + val logicalRelation = spark.sessionState.catalog.getCachedTable(qualifiedTableName) +.asInstanceOf[LogicalRelation] + assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes === dataSize) +} else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) +} + } +} + } +} + } + + test("External partitioned data source table does not support fallback to HDFS " +
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r315993569 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -220,10 +220,20 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast * data source. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(table: CatalogTable): LogicalPlan = { -val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) + private def maybeWithTableStats(tableMeta: CatalogTable): CatalogTable = { +if (tableMeta.stats.isEmpty && sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { + val sizeInBytes = CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, tableMeta) + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes +} else { + tableMeta +} + } + + private def readDataSourceTable(tableMeta: CatalogTable): LogicalPlan = { +val qualifiedTableName = QualifiedTableName(tableMeta.database, tableMeta.identifier.table) val catalog = sparkSession.sessionState.catalog catalog.getCachedPlan(qualifiedTableName, () => { + val table = maybeWithTableStats(tableMeta) Review comment: Verify it always uses cached relation if it cached. Checkout this [commit](https://github.com/apache/spark/pull/24715/commits/1fce50859b959cd4190f0da5dabf4addd187fb79): ```shell git checkout 1fce50859b959cd4190f0da5dabf4addd187fb79 build/sbt clean package export SPARK_PREPEND_CLASSES=true ./bin/spark-shell ``` ```scala import org.apache.spark.sql.catalyst.QualifiedTableName import org.apache.spark.sql.execution.datasources.LogicalRelation spark.sql("CREATE TABLE t3(id int, c2 int) USING parquet PARTITIONED BY(id)") spark.sql("INSERT INTO TABLE t3 PARTITION(id=1) SELECT 2") // default, fallBackToHdfs=false spark.sql("EXPLAIN COST SELECT * FROM t3").show(false) spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats // enable fallBackToHdfs spark.sql("set spark.sql.statistics.fallBackToHdfs=true") spark.sql("EXPLAIN COST SELECT * FROM t3").show(false) spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats // Invalidate cached relations spark.sessionState.catalog.invalidateAllCachedTables spark.sql("EXPLAIN COST SELECT * FROM t3").show(false) ``` ```scala scala> import org.apache.spark.sql.catalyst.QualifiedTableName import org.apache.spark.sql.catalyst.QualifiedTableName scala> import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.LogicalRelation scala> scala> spark.sql("CREATE TABLE t3(id int, c2 int) USING parquet PARTITIONED BY(id)") res0: org.apache.spark.sql.DataFrame = [] scala> spark.sql("INSERT INTO TABLE t3 PARTITION(id=1) SELECT 2") res1: org.apache.spark.sql.DataFrame = [] scala> scala> // default, fallBackToHdfs=false scala> spark.sql("EXPLAIN COST SELECT * FROM t3").show(false) ++ |plan | ++ |== Optimized Logical Plan == Relation[c2#1,id#2] parquet, Statistics(sizeInBytes=8.0 EiB) == Physical Plan == *(1) ColumnarToRow +- FileScan parquet default.t3[c2#1,id#2] Batched: true, DataFilters: [], Format: Parquet, Location: CatalogFileIndex[file:/root/opensource/SPARK-25474/spark-wareh
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r315685206 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -220,10 +220,20 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast * data source. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(table: CatalogTable): LogicalPlan = { -val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) + private def maybeWithTableStats(tableMeta: CatalogTable): CatalogTable = { +if (tableMeta.stats.isEmpty && sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { + val sizeInBytes = CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, tableMeta) + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes +} else { + tableMeta +} + } + + private def readDataSourceTable(tableMeta: CatalogTable): LogicalPlan = { +val qualifiedTableName = QualifiedTableName(tableMeta.database, tableMeta.identifier.table) val catalog = sparkSession.sessionState.catalog catalog.getCachedPlan(qualifiedTableName, () => { + val table = maybeWithTableStats(tableMeta) Review comment: We cache the table statistics to get better performance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r31506 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +619,34 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Defines default table statistics if table statistics are not available. + */ +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + + private val conf = session.sessionState.conf + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +// For the data source table, we only recalculate the table statistics Review comment: No. This pr only do it when: https://github.com/apache/spark/blob/1fce50859b959cd4190f0da5dabf4addd187fb79/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L635-L637 Otherwise, we will get the default table size from `CatalogFileIndex`, which we constructed in `DataSource`: https://github.com/apache/spark/blob/5bb69945e4aaf519cd10a5c5083332f618039af0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L386-L390 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r315098918 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +619,34 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Defines default table statistics if table statistics are not available. + */ +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + + private val conf = session.sessionState.conf + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +// For the data source table, we only recalculate the table statistics Review comment: No, they are different. This pr get stats from `catalogTable.flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled))).get`, but #22502 get stats from `Statistics(sizeInBytes = relation.sizeInBytes)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r315071177 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +619,34 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Defines default table statistics if table statistics are not available. + */ +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + + private val conf = session.sessionState.conf + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +// For the data source table, we only recalculate the table statistics Review comment: We will not use the `sizeInBytes` of the `BaseRelation` if table stats are available: https://github.com/apache/spark/blob/a1c1dd3484a4dcd7c38fe256e69dbaaaf10d1a92/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L42-L46 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r315063093 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +619,34 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Defines default table statistics if table statistics are not available. + */ +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + + private val conf = session.sessionState.conf + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +// For the data source table, we only recalculate the table statistics Review comment: Do you mean `BaseRelation` in `LogicalRelation`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r315058705 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,46 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// fallBackToHDFS = true: The table stats will be recalculated by DetermineTableStats +// fallBackToHDFS = false: The table stats will be recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { Review comment: Added a test for this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r315055204 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,46 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// fallBackToHDFS = true: The table stats will be recalculated by DetermineTableStats +// fallBackToHDFS = false: The table stats will be recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + +s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes > 0) + assert(relation.stats.sizeInBytes < conf.defaultSizeInBytes) Review comment: No. `getDataSize` does not support partitioned table. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r315046331 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +619,34 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Defines default table statistics if table statistics are not available. + */ +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + + private val conf = session.sessionState.conf + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +// For the data source table, we only recalculate the table statistics Review comment: How about we only do it when it will use `CatalogFileIndex`? ```scala case logical @ LogicalRelation(_, _, Some(table), _) if DDLUtils.isDatasourceTable(table) && table.stats.isEmpty && conf.fallBackToHdfsForStatsEnabled && conf.manageFilesourcePartitions && table.tracksPartitionsInCatalog && table.partitionColumnNames.nonEmpty => val sizeInBytes = CommandUtils.getSizeInBytesFallBackToHdfs(session, table) val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes logical.copy(catalogTable = Some(withStats)) ``` https://github.com/apache/spark/blob/5bb69945e4aaf519cd10a5c5083332f618039af0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L382-L391 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314771686 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,46 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// fallBackToHDFS = true: The table stats will be recalculated by DetermineTableStats +// fallBackToHDFS = false: The table stats will be recalculated by FileIndex Review comment: - non-partitioned tables uses `PartitioningAwareFileIndex`: https://github.com/apache/spark/blob/b276788d57b270d455ef6a7c5ed6cf8a74885dde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala#L103 - partitioned tables uses `CatalogFileIndex`: https://github.com/apache/spark/blob/5d672b7f3e07cfd7710df319fc6c7d2b9056a068/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala#L41 https://github.com/apache/spark/blob/c30b5297bc607ae33cc2fcf624b127942154e559/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L383-L388 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314607502 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +619,34 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Support for recalculating table statistics if table statistics are not available. Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314593180 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala ## @@ -345,14 +345,16 @@ object CommandUtils extends Logging { !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path) } - def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = { + def getSizeInBytesFallBackToHdfs(session: SparkSession, table: CatalogTable): Long = { try { val hadoopConf = session.sessionState.newHadoopConf() - path.getFileSystem(hadoopConf).getContentSummary(path).getLength + val tablePath = new Path(table.location) + val fs: FileSystem = tablePath.getFileSystem(hadoopConf) + fs.getContentSummary(tablePath).getLength } catch { case NonFatal(e) => -logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e) -defaultSize +logWarning(s"Failed to get table size from HDFS. Using the default data size.", e) Review comment: I think we don't need it anymore. #22502 add `defaultSize` because `defaultSize` may be different from `session.sessionState.conf.defaultSizeInBytes`. But now it is always `session.sessionState.conf.defaultSizeInBytes`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314589735 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala ## @@ -345,14 +345,16 @@ object CommandUtils extends Logging { !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path) } - def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = { + def getSizeInBytesFallBackToHdfs(session: SparkSession, table: CatalogTable): Long = { try { val hadoopConf = session.sessionState.newHadoopConf() - path.getFileSystem(hadoopConf).getContentSummary(path).getLength + val tablePath = new Path(table.location) + val fs: FileSystem = tablePath.getFileSystem(hadoopConf) + fs.getContentSummary(tablePath).getLength } catch { case NonFatal(e) => -logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e) -defaultSize +logWarning(s"Failed to get table size from HDFS. Using the default data size.", e) Review comment: I think we should format it if need print `defaultSize`: `org.apache.spark.util.Utils.bytesToString`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314587227 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,44 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Data source tables support fallback to HDFS for size estimation") { +// Non-partitioned table +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFSForStats => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFSForStats") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + +// Partitioned table +Seq(false, true).foreach { fallBackToHDFSForStats => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFSForStats") { +withTable("spark_25474") { + withTempDir { dir => +spark.sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314587076 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +620,35 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Support for recalculating table statistics if table statistics are not available. + */ +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + + private val sessionConf = session.sessionState.conf Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314587062 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +620,35 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Support for recalculating table statistics if table statistics are not available. + */ +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + + private val sessionConf = session.sessionState.conf + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +// For the data source table, we only recalculate the table statistics when it creates +// the CatalogFileIndex using defaultSizeInBytes. See SPARK-25474 for more details. Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314587042 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +620,35 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Support for recalculating table statistics if table statistics are not available. + */ +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + + private val sessionConf = session.sessionState.conf + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +// For the data source table, we only recalculate the table statistics when it creates +// the CatalogFileIndex using defaultSizeInBytes. See SPARK-25474 for more details. +case logical @ LogicalRelation(_, _, Some(table), _) + if sessionConf.fallBackToHdfsForStatsEnabled && table.stats.isEmpty && +sessionConf.manageFilesourcePartitions && +table.tracksPartitionsInCatalog && table.partitionColumnNames.nonEmpty => + val sizeInBytes = CommandUtils.getSizeInBytesFallBackToHdfs(session, table) + val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes + logical.copy(catalogTable = Some(withStats)) + +case relation: HiveTableRelation Review comment: @advancedxy Already work on this:https://github.com/apache/spark/pull/25306/commits/c86a27b2a5e286733ad305de1d7e42d1373b3a3b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314587025 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,44 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Data source tables support fallback to HDFS for size estimation") { +// Non-partitioned table +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFSForStats => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFSForStats") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + +// Partitioned table +Seq(false, true).foreach { fallBackToHDFSForStats => Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314587006 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,44 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Data source tables support fallback to HDFS for size estimation") { +// Non-partitioned table +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFSForStats => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFSForStats") { Review comment: // fallBackToHDFSForStats = true: The table stats will be recalculated by DetermineTableStats // fallBackToHDFSForStats = false: The table stats will be recalculated by [FileIndex](https://github.com/apache/spark/blob/b276788d57b270d455ef6a7c5ed6cf8a74885dde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala#L103) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r299268577 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +620,43 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Support for recalculating table statistics if table statistics are not available. + */ +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + def getHdfsSize(catalogTable: CatalogTable): Long = { +try { + val hadoopConf = session.sessionState.newHadoopConf() + val tablePath = new Path(catalogTable.location) + val fs: FileSystem = tablePath.getFileSystem(hadoopConf) + fs.getContentSummary(tablePath).getLength +} catch { + case e: IOException => +logWarning("Failed to get table size from hdfs.", e) +session.sessionState.conf.defaultSizeInBytes +} + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case logical @ LogicalRelation(relation, _, Some(table), _) + if session.sessionState.conf.fallBackToHdfsForStatsEnabled && +session.sessionState.conf.manageFilesourcePartitions && table.stats.isEmpty && +table.tracksPartitionsInCatalog && table.partitionColumnNames.nonEmpty => + val withStats = Review comment: Recalculated only when `fallBackToHdfsForStatsEnabled` and `useCatalogFileIndex`: https://github.com/apache/spark/blob/c30b5297bc607ae33cc2fcf624b127942154e559/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L379-L388 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r298788939 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala ## @@ -512,37 +512,46 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "orc", SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) { withTempPath { workDir => - withTable("table1") { -val workDirPath = workDir.getAbsolutePath -val data = Seq(100, 200, 300, 400).toDF("count") -data.write.orc(workDirPath) -val dfFromFile = spark.read.orc(workDirPath).cache() -val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { - case plan: InMemoryRelation => plan -}.head -// InMemoryRelation's stats is file size before the underlying RDD is materialized -assert(inMemoryRelation.computeStats().sizeInBytes === 486) - -// InMemoryRelation's stats is updated after materializing RDD -dfFromFile.collect() -assert(inMemoryRelation.computeStats().sizeInBytes === 16) - -// test of catalog table -val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache() -val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. - collect { case plan: InMemoryRelation => plan }.head - -// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's -// stats is calculated -assert(inMemoryRelation2.computeStats().sizeInBytes === 486) Review comment: Hive table also has this issue: ```scala import org.apache.spark.sql.execution.columnar.InMemoryRelation val tempDir = "/tmp/spark/spark_25474" spark.range(10).write.mode("overwrite").parquet(tempDir) spark.sql(s"CREATE TABLE spark_25474 (id BIGINT) STORED AS parquet LOCATION '$tempDir'") spark.sql("DESC FORMATTED spark_25474").show(false) val inMemoryRelation = spark.table("spark_25474").cache() val optimizedPlan = inMemoryRelation.queryExecution.optimizedPlan val inMemoryRelation = optimizedPlan.collect { case plan: InMemoryRelation => plan }.head println(inMemoryRelation.computeStats().sizeInBytes) ``` ``` scala> println(inMemoryRelation.computeStats().sizeInBytes) 9223372036854775807 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r298788939 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala ## @@ -512,37 +512,46 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "orc", SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) { withTempPath { workDir => - withTable("table1") { -val workDirPath = workDir.getAbsolutePath -val data = Seq(100, 200, 300, 400).toDF("count") -data.write.orc(workDirPath) -val dfFromFile = spark.read.orc(workDirPath).cache() -val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { - case plan: InMemoryRelation => plan -}.head -// InMemoryRelation's stats is file size before the underlying RDD is materialized -assert(inMemoryRelation.computeStats().sizeInBytes === 486) - -// InMemoryRelation's stats is updated after materializing RDD -dfFromFile.collect() -assert(inMemoryRelation.computeStats().sizeInBytes === 16) - -// test of catalog table -val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache() -val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. - collect { case plan: InMemoryRelation => plan }.head - -// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's -// stats is calculated -assert(inMemoryRelation2.computeStats().sizeInBytes === 486) Review comment: Hive table also has this issue: ```scala import org.apache.spark.sql.execution.columnar.InMemoryRelation val tempDir = "/tmp/spark/spark_25474" spark.range(10).write.mode("overwrite").parquet(tempDir) spark.sql(s"CREATE TABLE spark_25474 (id BIGINT) STORED AS parquet LOCATION '$tempDir'") spark.sql("DESC FORMATTED spark_25474").show(false) val inMemoryRelation = spark.table("spark_25474").cache() val optimizedPlan = inMemoryRelation.queryExecution.optimizedPlan val inMemoryRelation = optimizedPlan.collect { case plan: InMemoryRelation => plan }.head println(inMemoryRelation.computeStats().sizeInBytes) ``` ``` scala> println(inMemoryRelation.computeStats().sizeInBytes) 9223372036854775807 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r298529585 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -256,6 +256,21 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] } } +case class DetermineDataSourceTableStats(session: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case logicalRelation @ LogicalRelation(_, _, catalogTable, _) if catalogTable.nonEmpty && +catalogTable.forall(DDLUtils.isDatasourceTable) && catalogTable.forall(_.stats.isEmpty) => + val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { Review comment: I'm move `DetermineTableStats` from `HiveStrategies` to `DataSourceStrategy` to reduce duplicate. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org