sunchao commented on code in PR #36067:
URL: https://github.com/apache/spark/pull/36067#discussion_r844250860


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -53,7 +53,11 @@ class PathFilterIgnoreNonData(stagingDir: String) extends 
PathFilter with Serial
 object CommandUtils extends Logging {
 
   /** Change statistics after changing data by commands. */
-  def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit 
= {
+  def updateTableStats(

Review Comment:
   seems we also need to update a few other commands:
   - `AlterTableSetLocationCommand`
   - `LoadDataCommand`
   - `TruncateTableCommand`
   - `InsertIntoHadoopFsRelationCommand`
   
   It'd be better to only update stats for those partitions that have been 
touched, instead of updating all partitions.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -53,7 +53,11 @@ class PathFilterIgnoreNonData(stagingDir: String) extends 
PathFilter with Serial
 object CommandUtils extends Logging {
 
   /** Change statistics after changing data by commands. */
-  def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit 
= {
+  def updateTableStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      partitionSpec: Map[String, Option[String]] = Map.empty,
+      withAutoPartitionStats: Boolean = true): Unit = {

Review Comment:
   maybe make this default to false so the existing calls 
`updateTableStats(session, table)` can still maintain the existing behavior.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -53,7 +53,11 @@ class PathFilterIgnoreNonData(stagingDir: String) extends 
PathFilter with Serial
 object CommandUtils extends Logging {
 
   /** Change statistics after changing data by commands. */
-  def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit 
= {
+  def updateTableStats(

Review Comment:
   seems we also need to update a few other commands:
   - `AlterTableSetLocationCommand`
   - `LoadDataCommand`
   - `TruncateTableCommand`
   - `InsertIntoHadoopFsRelationCommand`
   
   It'd be better to only update stats for those partitions that are being 
touched, instead of updating all partitions.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -62,6 +66,14 @@ object CommandUtils extends Logging {
       if (isNewStats) {
         val newStats = CatalogStatistics(sizeInBytes = newSize)
         catalog.alterTableStats(table.identifier, Some(newStats))
+
+        if (withAutoPartitionStats) {
+          if (partitionSpec.nonEmpty) {
+            AnalyzePartitionCommand(table.identifier, partitionSpec, 
false).run(sparkSession)
+          } else if (table.partitionColumnNames.nonEmpty) {
+            AnalyzePartitionCommand(table.identifier, partitionSpec, 
false).run(sparkSession)

Review Comment:
   I think it's better to disable the full scan for now, since we don't do that 
either for table stats. We can add the option later if there is a need.



##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -114,7 +114,11 @@ case class InsertIntoHiveTable(
     CommandUtils.uncacheTableOrView(sparkSession, 
table.identifier.quotedString)
     sparkSession.sessionState.catalog.refreshTable(table.identifier)
 
-    CommandUtils.updateTableStats(sparkSession, table)
+    val partitionSpec = partition.map {
+      case (key, Some(null)) => key -> 
Some(ExternalCatalogUtils.DEFAULT_PARTITION_NAME)
+      case other => other
+    }
+    CommandUtils.updateTableStats(sparkSession, table, partitionSpec)

Review Comment:
   Oh cool



##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -114,7 +114,11 @@ case class InsertIntoHiveTable(
     CommandUtils.uncacheTableOrView(sparkSession, 
table.identifier.quotedString)
     sparkSession.sessionState.catalog.refreshTable(table.identifier)
 
-    CommandUtils.updateTableStats(sparkSession, table)
+    val partitionSpec = partition.map {
+      case (key, Some(null)) => key -> 
Some(ExternalCatalogUtils.DEFAULT_PARTITION_NAME)
+      case other => other
+    }
+    CommandUtils.updateTableStats(sparkSession, table, partitionSpec)

Review Comment:
   Oh cool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to