[GitHub] [spark] maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314964437 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,46 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// fallBackToHDFS = true: The table stats will be recalculated by DetermineTableStats +// fallBackToHDFS = false: The table stats will be recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + +s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes > 0) + assert(relation.stats.sizeInBytes < conf.defaultSizeInBytes) Review comment: We cannot use `getDataSize` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314964437 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,46 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// fallBackToHDFS = true: The table stats will be recalculated by DetermineTableStats +// fallBackToHDFS = false: The table stats will be recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { +withTempDir { dir => + withTable("spark_25474") { +sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + +s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") +sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes > 0) + assert(relation.stats.sizeInBytes < conf.defaultSizeInBytes) Review comment: We cannnot use `getDataSize` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314964256 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +619,34 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Defines default table statistics if table statistics are not available. + */ +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { Review comment: case class? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314964375 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,46 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// fallBackToHDFS = true: The table stats will be recalculated by DetermineTableStats +// fallBackToHDFS = false: The table stats will be recalculated by FileIndex +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { +Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { Review comment: Just in case, how about explicitly setting `defaultSizeInBytes` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314964366 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,46 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +// fallBackToHDFS = true: The table stats will be recalculated by DetermineTableStats +// fallBackToHDFS = false: The table stats will be recalculated by FileIndex Review comment: If so, the behaviour of the non-partitioned case should follow that of the partitioned case? (If `fallBackToHDFS`=false, we should use `defaultSizeInBytes` here, too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314587934 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -619,3 +619,34 @@ object DataSourceStrategy { (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } + + +/** + * Support for recalculating table statistics if table statistics are not available. Review comment: nit: `Defines default table statistics if `? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314587352 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala ## @@ -345,14 +345,16 @@ object CommandUtils extends Logging { !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path) } - def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = { + def getSizeInBytesFallBackToHdfs(session: SparkSession, table: CatalogTable): Long = { try { val hadoopConf = session.sessionState.newHadoopConf() - path.getFileSystem(hadoopConf).getContentSummary(path).getLength + val tablePath = new Path(table.location) + val fs: FileSystem = tablePath.getFileSystem(hadoopConf) + fs.getContentSummary(tablePath).getLength } catch { case NonFatal(e) => -logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e) -defaultSize +logWarning(s"Failed to get table size from HDFS. Using the default data size.", e) Review comment: btw, we don't need to print `defaultSize` anymore? ``` val defaultSize = session.sessionState.conf.defaultSizeInBytes logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e) defaultSize ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314587051 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala ## @@ -345,14 +345,16 @@ object CommandUtils extends Logging { !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path) } - def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = { + def getSizeInBytesFallBackToHdfs(session: SparkSession, table: CatalogTable): Long = { try { val hadoopConf = session.sessionState.newHadoopConf() - path.getFileSystem(hadoopConf).getContentSummary(path).getLength + val tablePath = new Path(table.location) + val fs: FileSystem = tablePath.getFileSystem(hadoopConf) + fs.getContentSummary(tablePath).getLength } catch { case NonFatal(e) => -logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e) -defaultSize +logWarning(s"Failed to get table size from HDFS. Using the default data size.", e) Review comment: nit: Drops `s` in the head. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation
maropu commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r314586339 ## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ## @@ -650,4 +650,44 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Data source tables support fallback to HDFS for size estimation") { +// Non-partitioned table +withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFSForStats => +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFSForStats") { + withTable("spark_25474") { +sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + +assert(getCatalogTable("spark_25474").stats.isEmpty) +val relation = spark.table("spark_25474").queryExecution.analyzed.children.head +assert(relation.stats.sizeInBytes === getDataSize(dir)) + } +} + } +} + +// Partitioned table +Seq(false, true).foreach { fallBackToHDFSForStats => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFSForStats") { +withTable("spark_25474") { + withTempDir { dir => +spark.sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + Review comment: nit: don't need `spark.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org