AngersZhuuuu commented on a change in pull request #31179:
URL: https://github.com/apache/spark/pull/31179#discussion_r605370937



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends 
PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && 
partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && 
partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>
+            key -> value.get
+          }
+          val partition = catalog.listPartitions(table.identifier, Some(spec))
+          val newTableStats = CommandUtils.mergeNewStats(
+            newTable.stats, statsTracker.totalNumBytes, 
Some(statsTracker.totalNumOutput))
+          val newPartitions = partition.flatten { part =>

Review comment:
       > This block seems to be the same with the `overwrite=false` case? 
https://github.com/apache/spark/pull/31179/files#diff-6309057f8f41f20f8de513ab67d7755aae5fb30d7441fc21000999c9e8e8e0bfR125-R140
   
   Done

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends 
PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && 
partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&

Review comment:
       > Could you move `isPartialPartitions` into L102?
   
   Done

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends 
PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && 
partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && 
partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>
+            key -> value.get
+          }
+          val partition = catalog.listPartitions(table.identifier, Some(spec))
+          val newTableStats = CommandUtils.mergeNewStats(
+            newTable.stats, statsTracker.totalNumBytes, 
Some(statsTracker.totalNumOutput))
+          val newPartitions = partition.flatten { part =>
+            val newStates = if (part.stats.isDefined && 
part.stats.get.rowCount.isDefined) {
+              CommandUtils.mergeNewStats(
+                part.stats, statsTracker.totalNumBytes, 
Some(statsTracker.totalNumOutput))
+            } else {
+              CommandUtils.compareAndGetNewStats(
+                part.stats, statsTracker.totalNumBytes, 
Some(statsTracker.totalNumOutput))
+            }
+            newStates.map(_ => part.copy(stats = newStates))
+          }
+          if (newTableStats.isDefined) {
+            catalog.alterTableStats(table.identifier, newTableStats)
+          }
+          if (newPartitions.nonEmpty) {
+            catalog.alterPartitions(table.identifier, newPartitions)
+          }
+        } else {
+          // update all partitions statistics
+          val partitions = statsTracker.partitionsStats.map { case (part, 
stats) =>
+            val partition = catalog.getPartition(table.identifier, part)
+            val newStats = Some(CatalogStatistics(
+              sizeInBytes = stats.numBytes, rowCount = Some(stats.numRows)))
+            partition.copy(stats = newStats)
+          }.toSeq
+          if (partitions.nonEmpty) {
+            catalog.alterPartitions(table.identifier, partitions)
+          }
+
+          if (isPartialPartitions) {
+            val newStats = CommandUtils.mergeNewStats(
+              newTable.stats, statsTracker.totalNumBytes, 
Some(statsTracker.totalNumOutput))
+            if (newStats.isDefined) {
+              catalog.alterTableStats(table.identifier, newStats)
+            }
+          } else {
+            val newStats = CommandUtils.compareAndGetNewStats(
+              newTable.stats, statsTracker.totalNumBytes, 
Some(statsTracker.totalNumOutput))
+            if (newStats.isDefined) {
+              catalog.alterTableStats(table.identifier, newStats)
+            }
+          }
+        }
+      } else {
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>

Review comment:
       > `val spec = partitionSpec.mapValues(_.get)`
   
   DOne

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends 
PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && 
partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && 
partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>

Review comment:
       > `val spec = partitionSpec.mapValues(_.get)`
   
   Done

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -199,6 +312,17 @@ object CommandUtils extends Logging {
     newStats
   }
 
+  def mergeNewStats(

Review comment:
       > private
   
   Done

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends 
PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && 
partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && 
partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>
+            key -> value.get
+          }
+          val partition = catalog.listPartitions(table.identifier, Some(spec))
+          val newTableStats = CommandUtils.mergeNewStats(
+            newTable.stats, statsTracker.totalNumBytes, 
Some(statsTracker.totalNumOutput))
+          val newPartitions = partition.flatten { part =>
+            val newStates = if (part.stats.isDefined && 
part.stats.get.rowCount.isDefined) {
+              CommandUtils.mergeNewStats(

Review comment:
       > ditto
   
   Done

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
##########
@@ -51,6 +51,119 @@ class PathFilterIgnoreNonData(stagingDir: String) extends 
PathFilter with Serial
 
 object CommandUtils extends Logging {
 
+  def updateTableAndPartitionStats(
+      sparkSession: SparkSession,
+      table: CatalogTable,
+      overwrite: Boolean,
+      partitionSpec: Map[String, Option[String]],
+      statsTracker: BasicWriteJobStatsTracker): Unit = {
+    val catalog = sparkSession.sessionState.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val isSinglePartition = partitionSpec.nonEmpty && 
partitionSpec.values.forall(_.nonEmpty)
+      val isPartialPartitions = partitionSpec.nonEmpty &&
+          partitionSpec.values.exists(_.isEmpty) && 
partitionSpec.values.exists(_.nonEmpty)
+      if (overwrite) {
+        // Only update one partition, statsTracker.partitionsStats is empty
+        if (isSinglePartition) {
+          val spec = partitionSpec.map { case (key, value) =>
+            key -> value.get
+          }
+          val partition = catalog.listPartitions(table.identifier, Some(spec))
+          val newTableStats = CommandUtils.mergeNewStats(

Review comment:
       > `CommandUtils.mergeNewStats` -> `mergeNewStats`
   
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to