[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-23 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r317194291
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -1216,8 +1216,12 @@ object SQLConf {
   .createWithDefault(true)
 
   val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = 
buildConf("spark.sql.statistics.fallBackToHdfs")
-.doc("If the table statistics are not available from table metadata enable 
fall back to hdfs." +
-  " This is useful in determining if a table is small enough to use auto 
broadcast joins.")
+.doc("This flag is effective only if it is Hive table. When true, it will 
fall back to HDFS " +
+  "if the table statistics are not available from table metadata. This is 
useful in " +
+  "determining if a table is small enough to use auto broadcast joins. " +
+  "For non-partitioned data source table, it will be automatically 
recalculated if table " +
+  "statistics are not available. For partitioned data source table, It is 
" +
+  s"'${DEFAULT_SIZE_IN_BYTES.key}' if table statistics are not available.")
 .booleanConf
 
 Review comment:
   cc @shahidki31


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-22 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r316570564
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFS =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+// Table statistics are always recalculated by FileIndex
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation" +
+"with defaultSizeInBytes") {
+val defaultSizeInBytes = 10 * 1024 * 1024
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(
+SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS",
+SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table stats should be cached") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  val dataSize =
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474"))
+  assert(relation.stats.sizeInBytes === dataSize)
+
+  val qualifiedTableName =
+
QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474")
+  val logicalRelation = 
spark.sessionState.catalog.getCachedTable(qualifiedTableName)
+.asInstanceOf[LogicalRelation]
+  assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes 
=== dataSize)
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("External partitioned data source table does not support fallback to 
HDFS " +
+  

[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-22 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r316544252
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFS =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+// Table statistics are always recalculated by FileIndex
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation" +
+"with defaultSizeInBytes") {
+val defaultSizeInBytes = 10 * 1024 * 1024
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(
+SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS",
+SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table stats should be cached") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  val dataSize =
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474"))
+  assert(relation.stats.sizeInBytes === dataSize)
+
+  val qualifiedTableName =
+
QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474")
+  val logicalRelation = 
spark.sessionState.catalog.getCachedTable(qualifiedTableName)
+.asInstanceOf[LogicalRelation]
+  assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes 
=== dataSize)
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("External partitioned data source table does not support fallback to 
HDFS " +
+  

[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-22 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r316534257
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFS =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+// Table statistics are always recalculated by FileIndex
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation" +
+"with defaultSizeInBytes") {
+val defaultSizeInBytes = 10 * 1024 * 1024
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(
+SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS",
+SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table stats should be cached") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  val dataSize =
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474"))
+  assert(relation.stats.sizeInBytes === dataSize)
+
+  val qualifiedTableName =
+
QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474")
+  val logicalRelation = 
spark.sessionState.catalog.getCachedTable(qualifiedTableName)
+.asInstanceOf[LogicalRelation]
+  assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes 
=== dataSize)
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("External partitioned data source table does not support fallback to 
HDFS " +
+  

[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-22 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r316534207
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFS =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+// Table statistics are always recalculated by FileIndex
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation" +
+"with defaultSizeInBytes") {
+val defaultSizeInBytes = 10 * 1024 * 1024
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(
+SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS",
+SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table stats should be cached") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  val dataSize =
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474"))
+  assert(relation.stats.sizeInBytes === dataSize)
+
+  val qualifiedTableName =
+
QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474")
+  val logicalRelation = 
spark.sessionState.catalog.getCachedTable(qualifiedTableName)
+.asInstanceOf[LogicalRelation]
+  assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes 
=== dataSize)
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("External partitioned data source table does not support fallback to 
HDFS " +
+  

[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-21 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r316479433
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFS =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+// Table statistics are always recalculated by FileIndex
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation" +
+"with defaultSizeInBytes") {
+val defaultSizeInBytes = 10 * 1024 * 1024
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(
+SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS",
+SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table stats should be cached") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  val dataSize =
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474"))
+  assert(relation.stats.sizeInBytes === dataSize)
+
+  val qualifiedTableName =
+
QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474")
+  val logicalRelation = 
spark.sessionState.catalog.getCachedTable(qualifiedTableName)
+.asInstanceOf[LogicalRelation]
+  assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes 
=== dataSize)
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("External partitioned data source table does not support fallback to 
HDFS " +
+  

[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-21 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r316218631
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFS =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+// Table statistics are always recalculated by FileIndex
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation" +
+"with defaultSizeInBytes") {
+val defaultSizeInBytes = 10 * 1024 * 1024
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(
+SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS",
+SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table stats should be cached") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  val dataSize =
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474"))
+  assert(relation.stats.sizeInBytes === dataSize)
+
+  val qualifiedTableName =
+
QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474")
+  val logicalRelation = 
spark.sessionState.catalog.getCachedTable(qualifiedTableName)
+.asInstanceOf[LogicalRelation]
+  assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes 
=== dataSize)
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("External partitioned data source table does not support fallback to 
HDFS " +
+  

[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-21 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r316115363
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFS =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+// Table statistics are always recalculated by FileIndex
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation" +
+"with defaultSizeInBytes") {
+val defaultSizeInBytes = 10 * 1024 * 1024
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(
+SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS",
+SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes ===
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474")))
+} else {
+  assert(relation.stats.sizeInBytes === defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table stats should be cached") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+  s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  val dataSize =
+CommandUtils.getSizeInBytesFallBackToHdfs(spark, 
getCatalogTable("spark_25474"))
+  assert(relation.stats.sizeInBytes === dataSize)
+
+  val qualifiedTableName =
+
QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474")
+  val logicalRelation = 
spark.sessionState.catalog.getCachedTable(qualifiedTableName)
+.asInstanceOf[LogicalRelation]
+  assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes 
=== dataSize)
+} else {
+  assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
+}
+  }
+}
+  }
+}
+  }
+
+  test("External partitioned data source table does not support fallback to 
HDFS " +
 
 

[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-20 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r315993569
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -220,10 +220,20 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
  * data source.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
-  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
-val qualifiedTableName = QualifiedTableName(table.database, 
table.identifier.table)
+  private def maybeWithTableStats(tableMeta: CatalogTable): CatalogTable = {
+if (tableMeta.stats.isEmpty && 
sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+  val sizeInBytes = 
CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, tableMeta)
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  private def readDataSourceTable(tableMeta: CatalogTable): LogicalPlan = {
+val qualifiedTableName = QualifiedTableName(tableMeta.database, 
tableMeta.identifier.table)
 val catalog = sparkSession.sessionState.catalog
 catalog.getCachedPlan(qualifiedTableName, () => {
+  val table = maybeWithTableStats(tableMeta)
 
 Review comment:
   Verify it always uses cached relation if it cached. Checkout this 
[commit](https://github.com/apache/spark/pull/24715/commits/1fce50859b959cd4190f0da5dabf4addd187fb79):
   ```shell
   git checkout 1fce50859b959cd4190f0da5dabf4addd187fb79
   build/sbt clean package
   export SPARK_PREPEND_CLASSES=true
   ./bin/spark-shell
   ```
   
   ```scala
   import org.apache.spark.sql.catalyst.QualifiedTableName
   import org.apache.spark.sql.execution.datasources.LogicalRelation
   
   spark.sql("CREATE TABLE t3(id int, c2 int) USING parquet PARTITIONED BY(id)")
   spark.sql("INSERT INTO TABLE t3 PARTITION(id=1) SELECT 2")
   
   // default, fallBackToHdfs=false
   spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
   
spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase,
 "t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats
   
   // enable fallBackToHdfs
   spark.sql("set spark.sql.statistics.fallBackToHdfs=true")
   spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
   
spark.sessionState.catalog.getCachedTable(QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase,
 "t3")).asInstanceOf[LogicalRelation].catalogTable.get.stats
   
   // Invalidate cached relations
   spark.sessionState.catalog.invalidateAllCachedTables
   spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
   ```
   
   ```scala
   scala> import org.apache.spark.sql.catalyst.QualifiedTableName
   import org.apache.spark.sql.catalyst.QualifiedTableName
   
   scala> import org.apache.spark.sql.execution.datasources.LogicalRelation
   import org.apache.spark.sql.execution.datasources.LogicalRelation
   
   scala>
   
   scala> spark.sql("CREATE TABLE t3(id int, c2 int) USING parquet PARTITIONED 
BY(id)")
   res0: org.apache.spark.sql.DataFrame = []
   
   scala> spark.sql("INSERT INTO TABLE t3 PARTITION(id=1) SELECT 2")
   res1: org.apache.spark.sql.DataFrame = []
   
   scala>
   
   scala> // default, fallBackToHdfs=false
   
   scala> spark.sql("EXPLAIN COST SELECT * FROM t3").show(false)
   
++
   |plan



|
   
++
   |== Optimized Logical Plan ==
   Relation[c2#1,id#2] parquet, Statistics(sizeInBytes=8.0 EiB)
   
   == Physical Plan ==
   *(1) ColumnarToRow
   +- FileScan parquet default.t3[c2#1,id#2] Batched: true, DataFilters: [], 
Format: Parquet, Location: 
CatalogFileIndex[file:/root/opensource/SPARK-25474/spark-wareh

[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-20 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r315685206
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -220,10 +220,20 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
  * data source.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
-  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
-val qualifiedTableName = QualifiedTableName(table.database, 
table.identifier.table)
+  private def maybeWithTableStats(tableMeta: CatalogTable): CatalogTable = {
+if (tableMeta.stats.isEmpty && 
sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+  val sizeInBytes = 
CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, tableMeta)
+  tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+} else {
+  tableMeta
+}
+  }
+
+  private def readDataSourceTable(tableMeta: CatalogTable): LogicalPlan = {
+val qualifiedTableName = QualifiedTableName(tableMeta.database, 
tableMeta.identifier.table)
 val catalog = sparkSession.sessionState.catalog
 catalog.getCachedPlan(qualifiedTableName, () => {
+  val table = maybeWithTableStats(tableMeta)
 
 Review comment:
   We cache the table statistics to get better performance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-19 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r31506
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -619,3 +619,34 @@ object DataSourceStrategy {
 (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 }
+
+
+/**
+ * Defines default table statistics if table statistics are not available.
+ */
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+
+  private val conf = session.sessionState.conf
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+// For the data source table, we only recalculate the table statistics
 
 Review comment:
   No. This pr only do it when:
   
https://github.com/apache/spark/blob/1fce50859b959cd4190f0da5dabf4addd187fb79/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L635-L637
   
   Otherwise, we will get the default table size from `CatalogFileIndex`, which 
we constructed in `DataSource`:
   
https://github.com/apache/spark/blob/5bb69945e4aaf519cd10a5c5083332f618039af0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L386-L390


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-19 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r315098918
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -619,3 +619,34 @@ object DataSourceStrategy {
 (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 }
+
+
+/**
+ * Defines default table statistics if table statistics are not available.
+ */
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+
+  private val conf = session.sessionState.conf
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+// For the data source table, we only recalculate the table statistics
 
 Review comment:
   No, they are different. This pr get stats from 
`catalogTable.flatMap(_.stats.map(_.toPlanStats(output, 
conf.cboEnabled))).get`, but #22502 get stats from `Statistics(sizeInBytes = 
relation.sizeInBytes)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-19 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r315071177
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -619,3 +619,34 @@ object DataSourceStrategy {
 (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 }
+
+
+/**
+ * Defines default table statistics if table statistics are not available.
+ */
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+
+  private val conf = session.sessionState.conf
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+// For the data source table, we only recalculate the table statistics
 
 Review comment:
   We will not use the `sizeInBytes` of the  `BaseRelation` if table stats are 
available:
   
https://github.com/apache/spark/blob/a1c1dd3484a4dcd7c38fe256e69dbaaaf10d1a92/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L42-L46


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-19 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r315063093
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -619,3 +619,34 @@ object DataSourceStrategy {
 (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 }
+
+
+/**
+ * Defines default table statistics if table statistics are not available.
+ */
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+
+  private val conf = session.sessionState.conf
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+// For the data source table, we only recalculate the table statistics
 
 Review comment:
   Do you mean `BaseRelation` in `LogicalRelation`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-18 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r315058705
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +650,46 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFS =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+// fallBackToHDFS = true: The table stats will be recalculated by 
DetermineTableStats
+// fallBackToHDFS = false: The table stats will be recalculated by 
FileIndex
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
 
 Review comment:
   Added a test for this case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-18 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r315055204
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +650,46 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFS =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+// fallBackToHDFS = true: The table stats will be recalculated by 
DetermineTableStats
+// fallBackToHDFS = false: The table stats will be recalculated by 
FileIndex
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+  }
+
+  test("Partitioned data source table support fallback to HDFS for size 
estimation") {
+Seq(false, true).foreach { fallBackToHDFS =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+withTempDir { dir =>
+  withTable("spark_25474") {
+sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
+s"PARTITIONED BY(a) LOCATION '${dir.toURI}'")
+sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2")
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+if (fallBackToHDFS) {
+  assert(relation.stats.sizeInBytes > 0)
+  assert(relation.stats.sizeInBytes < conf.defaultSizeInBytes)
 
 Review comment:
   No. `getDataSize` does not support partitioned table.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-18 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r315046331
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -619,3 +619,34 @@ object DataSourceStrategy {
 (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 }
+
+
+/**
+ * Defines default table statistics if table statistics are not available.
+ */
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+
+  private val conf = session.sessionState.conf
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+// For the data source table, we only recalculate the table statistics
 
 Review comment:
   How about we only do it when it will use `CatalogFileIndex`?
   ```scala
   case logical @ LogicalRelation(_, _, Some(table), _)
 if DDLUtils.isDatasourceTable(table) && table.stats.isEmpty &&
   conf.fallBackToHdfsForStatsEnabled && 
conf.manageFilesourcePartitions &&
   table.tracksPartitionsInCatalog && 
table.partitionColumnNames.nonEmpty =>
 val sizeInBytes = CommandUtils.getSizeInBytesFallBackToHdfs(session, 
table)
 val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes 
= BigInt(sizeInBytes
 logical.copy(catalogTable = Some(withStats))
   ```
   
   
https://github.com/apache/spark/blob/5bb69945e4aaf519cd10a5c5083332f618039af0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L382-L391


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-16 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r314771686
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +650,46 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Non-partitioned data source table support fallback to HDFS for size 
estimation") {
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFS =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFS") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+// fallBackToHDFS = true: The table stats will be recalculated by 
DetermineTableStats
+// fallBackToHDFS = false: The table stats will be recalculated by 
FileIndex
 
 Review comment:
   - non-partitioned tables uses `PartitioningAwareFileIndex`:
   
   
https://github.com/apache/spark/blob/b276788d57b270d455ef6a7c5ed6cf8a74885dde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala#L103
   
   - partitioned tables uses `CatalogFileIndex`:
   
   
https://github.com/apache/spark/blob/5d672b7f3e07cfd7710df319fc6c7d2b9056a068/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala#L41
   
   
https://github.com/apache/spark/blob/c30b5297bc607ae33cc2fcf624b127942154e559/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L383-L388
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-16 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r314607502
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -619,3 +619,34 @@ object DataSourceStrategy {
 (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 }
+
+
+/**
+ * Support for recalculating table statistics if table statistics are not 
available.
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-15 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r314593180
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ##
 @@ -345,14 +345,16 @@ object CommandUtils extends Logging {
 !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
   }
 
-  def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, 
defaultSize: Long): Long = {
+  def getSizeInBytesFallBackToHdfs(session: SparkSession, table: 
CatalogTable): Long = {
 try {
   val hadoopConf = session.sessionState.newHadoopConf()
-  path.getFileSystem(hadoopConf).getContentSummary(path).getLength
+  val tablePath = new Path(table.location)
+  val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+  fs.getContentSummary(tablePath).getLength
 } catch {
   case NonFatal(e) =>
-logWarning(s"Failed to get table size from hdfs. Using the default 
size, $defaultSize.", e)
-defaultSize
+logWarning(s"Failed to get table size from HDFS. Using the default 
data size.", e)
 
 Review comment:
   I think we don't need it anymore. #22502 add `defaultSize` because 
`defaultSize` may be different from 
`session.sessionState.conf.defaultSizeInBytes`. But now it is always 
`session.sessionState.conf.defaultSizeInBytes`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-15 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r314589735
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ##
 @@ -345,14 +345,16 @@ object CommandUtils extends Logging {
 !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
   }
 
-  def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, 
defaultSize: Long): Long = {
+  def getSizeInBytesFallBackToHdfs(session: SparkSession, table: 
CatalogTable): Long = {
 try {
   val hadoopConf = session.sessionState.newHadoopConf()
-  path.getFileSystem(hadoopConf).getContentSummary(path).getLength
+  val tablePath = new Path(table.location)
+  val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+  fs.getContentSummary(tablePath).getLength
 } catch {
   case NonFatal(e) =>
-logWarning(s"Failed to get table size from hdfs. Using the default 
size, $defaultSize.", e)
-defaultSize
+logWarning(s"Failed to get table size from HDFS. Using the default 
data size.", e)
 
 Review comment:
   I think we should format it if need print `defaultSize`: 
`org.apache.spark.util.Utils.bytesToString`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-15 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r314587227
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +650,44 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Data source tables support fallback to HDFS for size estimation") {
+// Non-partitioned table
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFSForStats =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFSForStats") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+
+// Partitioned table
+Seq(false, true).foreach { fallBackToHDFSForStats =>
+  withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFSForStats") {
+withTable("spark_25474") {
+  withTempDir { dir =>
+spark.sql("CREATE TABLE spark_25474(a int, b int) USING parquet " +
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-15 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r314587076
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -619,3 +620,35 @@ object DataSourceStrategy {
 (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 }
+
+
+/**
+ * Support for recalculating table statistics if table statistics are not 
available.
+ */
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+
+  private val sessionConf = session.sessionState.conf
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-15 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r314587062
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -619,3 +620,35 @@ object DataSourceStrategy {
 (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 }
+
+
+/**
+ * Support for recalculating table statistics if table statistics are not 
available.
+ */
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+
+  private val sessionConf = session.sessionState.conf
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+// For the data source table, we only recalculate the table statistics 
when it creates
+// the CatalogFileIndex using defaultSizeInBytes. See SPARK-25474 for more 
details.
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-15 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r314587042
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -619,3 +620,35 @@ object DataSourceStrategy {
 (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 }
+
+
+/**
+ * Support for recalculating table statistics if table statistics are not 
available.
+ */
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+
+  private val sessionConf = session.sessionState.conf
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+// For the data source table, we only recalculate the table statistics 
when it creates
+// the CatalogFileIndex using defaultSizeInBytes. See SPARK-25474 for more 
details.
+case logical @ LogicalRelation(_, _, Some(table), _)
+  if sessionConf.fallBackToHdfsForStatsEnabled && table.stats.isEmpty &&
+sessionConf.manageFilesourcePartitions &&
+table.tracksPartitionsInCatalog && table.partitionColumnNames.nonEmpty 
=>
+  val sizeInBytes = CommandUtils.getSizeInBytesFallBackToHdfs(session, 
table)
+  val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = 
BigInt(sizeInBytes
+  logical.copy(catalogTable = Some(withStats))
+
+case relation: HiveTableRelation
 
 Review comment:
   @advancedxy Already work on 
this:https://github.com/apache/spark/pull/25306/commits/c86a27b2a5e286733ad305de1d7e42d1373b3a3b


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-15 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r314587025
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +650,44 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Data source tables support fallback to HDFS for size estimation") {
+// Non-partitioned table
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFSForStats =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFSForStats") {
+  withTable("spark_25474") {
+sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION 
'${dir.toURI}'")
+
spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+
+assert(getCatalogTable("spark_25474").stats.isEmpty)
+val relation = 
spark.table("spark_25474").queryExecution.analyzed.children.head
+assert(relation.stats.sizeInBytes === getDataSize(dir))
+  }
+}
+  }
+}
+
+// Partitioned table
+Seq(false, true).foreach { fallBackToHDFSForStats =>
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-08-15 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r314587006
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
 ##
 @@ -650,4 +650,44 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 }
   }
+
+  test("Data source tables support fallback to HDFS for size estimation") {
+// Non-partitioned table
+withTempDir { dir =>
+  Seq(false, true).foreach { fallBackToHDFSForStats =>
+withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
s"$fallBackToHDFSForStats") {
 
 Review comment:
   // fallBackToHDFSForStats = true: The table stats will be recalculated by 
DetermineTableStats
   // fallBackToHDFSForStats = false: The table stats will be recalculated by 
[FileIndex](https://github.com/apache/spark/blob/b276788d57b270d455ef6a7c5ed6cf8a74885dde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala#L103)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-07-01 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r299268577
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -619,3 +620,43 @@ object DataSourceStrategy {
 (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, 
handledFilters)
   }
 }
+
+
+/**
+ * Support for recalculating table statistics if table statistics are not 
available.
+ */
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+  def getHdfsSize(catalogTable: CatalogTable): Long = {
+try {
+  val hadoopConf = session.sessionState.newHadoopConf()
+  val tablePath = new Path(catalogTable.location)
+  val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+  fs.getContentSummary(tablePath).getLength
+} catch {
+  case e: IOException =>
+logWarning("Failed to get table size from hdfs.", e)
+session.sessionState.conf.defaultSizeInBytes
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case logical @ LogicalRelation(relation, _, Some(table), _)
+  if session.sessionState.conf.fallBackToHdfsForStatsEnabled &&
+session.sessionState.conf.manageFilesourcePartitions && 
table.stats.isEmpty &&
+table.tracksPartitionsInCatalog && table.partitionColumnNames.nonEmpty 
=>
+  val withStats =
 
 Review comment:
   Recalculated only when `fallBackToHdfsForStatsEnabled` and 
`useCatalogFileIndex`:
   
https://github.com/apache/spark/blob/c30b5297bc607ae33cc2fcf624b127942154e559/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L379-L388


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-07-01 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r298788939
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ##
 @@ -512,37 +512,46 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "orc",
 SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) {
 withTempPath { workDir =>
-  withTable("table1") {
-val workDirPath = workDir.getAbsolutePath
-val data = Seq(100, 200, 300, 400).toDF("count")
-data.write.orc(workDirPath)
-val dfFromFile = spark.read.orc(workDirPath).cache()
-val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
-  case plan: InMemoryRelation => plan
-}.head
-// InMemoryRelation's stats is file size before the underlying RDD 
is materialized
-assert(inMemoryRelation.computeStats().sizeInBytes === 486)
-
-// InMemoryRelation's stats is updated after materializing RDD
-dfFromFile.collect()
-assert(inMemoryRelation.computeStats().sizeInBytes === 16)
-
-// test of catalog table
-val dfFromTable = spark.catalog.createTable("table1", 
workDirPath).cache()
-val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
-  collect { case plan: InMemoryRelation => plan }.head
-
-// Even CBO enabled, InMemoryRelation's stats keeps as the file 
size before table's
-// stats is calculated
-assert(inMemoryRelation2.computeStats().sizeInBytes === 486)
 
 Review comment:
   Hive table also has this issue:
   ```scala
   import org.apache.spark.sql.execution.columnar.InMemoryRelation
   
   val tempDir = "/tmp/spark/spark_25474"
   spark.range(10).write.mode("overwrite").parquet(tempDir)
   spark.sql(s"CREATE TABLE spark_25474 (id BIGINT) STORED AS parquet LOCATION 
'$tempDir'")
   spark.sql("DESC FORMATTED spark_25474").show(false)
   val inMemoryRelation = spark.table("spark_25474").cache()
   val optimizedPlan = inMemoryRelation.queryExecution.optimizedPlan
   val inMemoryRelation = optimizedPlan.collect { case plan: InMemoryRelation 
=> plan }.head
   println(inMemoryRelation.computeStats().sizeInBytes)
   ```
   ```
   scala> println(inMemoryRelation.computeStats().sizeInBytes)
   9223372036854775807
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-06-28 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r298788939
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ##
 @@ -512,37 +512,46 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "orc",
 SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) {
 withTempPath { workDir =>
-  withTable("table1") {
-val workDirPath = workDir.getAbsolutePath
-val data = Seq(100, 200, 300, 400).toDF("count")
-data.write.orc(workDirPath)
-val dfFromFile = spark.read.orc(workDirPath).cache()
-val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
-  case plan: InMemoryRelation => plan
-}.head
-// InMemoryRelation's stats is file size before the underlying RDD 
is materialized
-assert(inMemoryRelation.computeStats().sizeInBytes === 486)
-
-// InMemoryRelation's stats is updated after materializing RDD
-dfFromFile.collect()
-assert(inMemoryRelation.computeStats().sizeInBytes === 16)
-
-// test of catalog table
-val dfFromTable = spark.catalog.createTable("table1", 
workDirPath).cache()
-val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
-  collect { case plan: InMemoryRelation => plan }.head
-
-// Even CBO enabled, InMemoryRelation's stats keeps as the file 
size before table's
-// stats is calculated
-assert(inMemoryRelation2.computeStats().sizeInBytes === 486)
 
 Review comment:
   Hive table also has this issue:
   ```scala
   import org.apache.spark.sql.execution.columnar.InMemoryRelation
   
   val tempDir = "/tmp/spark/spark_25474"
   spark.range(10).write.mode("overwrite").parquet(tempDir)
   spark.sql(s"CREATE TABLE spark_25474 (id BIGINT) STORED AS parquet LOCATION 
'$tempDir'")
   spark.sql("DESC FORMATTED spark_25474").show(false)
   val inMemoryRelation = spark.table("spark_25474").cache()
   val optimizedPlan = inMemoryRelation.queryExecution.optimizedPlan
   val inMemoryRelation = optimizedPlan.collect { case plan: InMemoryRelation 
=> plan }.head
   println(inMemoryRelation.computeStats().sizeInBytes)
   ```
   ```
   scala> println(inMemoryRelation.computeStats().sizeInBytes)
   9223372036854775807
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation

2019-06-28 Thread GitBox
wangyum commented on a change in pull request #24715: [SPARK-25474][SQL] Data 
source tables support fallback to HDFS for size estimation
URL: https://github.com/apache/spark/pull/24715#discussion_r298529585
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -256,6 +256,21 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
   }
 }
 
+case class DetermineDataSourceTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case logicalRelation @ LogicalRelation(_, _, catalogTable, _) if 
catalogTable.nonEmpty &&
+catalogTable.forall(DDLUtils.isDatasourceTable) && 
catalogTable.forall(_.stats.isEmpty) =>
+  val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
 
 Review comment:
   I'm move `DetermineTableStats` from `HiveStrategies` to `DataSourceStrategy` 
to reduce duplicate.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org