sunchao commented on code in PR #36067:
URL: https://github.com/apache/spark/pull/36067#discussion_r844250860
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -53,7 +53,11 @@ class PathFilterIgnoreNonData(stagingDir: String) extends
PathFilter with Serial
object CommandUtils extends Logging {
/** Change statistics after changing data by commands. */
- def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit
= {
+ def updateTableStats(
Review Comment:
seems we also need to update a few other commands:
- `AlterTableSetLocationCommand`
- `LoadDataCommand`
- `TruncateTableCommand`
- `InsertIntoHadoopFsRelationCommand`
It'd be better to only update stats for those partitions that have been
touched, instead of updating all partitions.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -53,7 +53,11 @@ class PathFilterIgnoreNonData(stagingDir: String) extends
PathFilter with Serial
object CommandUtils extends Logging {
/** Change statistics after changing data by commands. */
- def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit
= {
+ def updateTableStats(
+ sparkSession: SparkSession,
+ table: CatalogTable,
+ partitionSpec: Map[String, Option[String]] = Map.empty,
+ withAutoPartitionStats: Boolean = true): Unit = {
Review Comment:
maybe make this default to false so the existing calls
`updateTableStats(session, table)` can still maintain the existing behavior.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -53,7 +53,11 @@ class PathFilterIgnoreNonData(stagingDir: String) extends
PathFilter with Serial
object CommandUtils extends Logging {
/** Change statistics after changing data by commands. */
- def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit
= {
+ def updateTableStats(
Review Comment:
seems we also need to update a few other commands:
- `AlterTableSetLocationCommand`
- `LoadDataCommand`
- `TruncateTableCommand`
- `InsertIntoHadoopFsRelationCommand`
It'd be better to only update stats for those partitions that are being
touched, instead of updating all partitions.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala:
##########
@@ -62,6 +66,14 @@ object CommandUtils extends Logging {
if (isNewStats) {
val newStats = CatalogStatistics(sizeInBytes = newSize)
catalog.alterTableStats(table.identifier, Some(newStats))
+
+ if (withAutoPartitionStats) {
+ if (partitionSpec.nonEmpty) {
+ AnalyzePartitionCommand(table.identifier, partitionSpec,
false).run(sparkSession)
+ } else if (table.partitionColumnNames.nonEmpty) {
+ AnalyzePartitionCommand(table.identifier, partitionSpec,
false).run(sparkSession)
Review Comment:
I think it's better to disable the full scan for now, since we don't do that
either for table stats. We can add the option later if there is a need.
##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -114,7 +114,11 @@ case class InsertIntoHiveTable(
CommandUtils.uncacheTableOrView(sparkSession,
table.identifier.quotedString)
sparkSession.sessionState.catalog.refreshTable(table.identifier)
- CommandUtils.updateTableStats(sparkSession, table)
+ val partitionSpec = partition.map {
+ case (key, Some(null)) => key ->
Some(ExternalCatalogUtils.DEFAULT_PARTITION_NAME)
+ case other => other
+ }
+ CommandUtils.updateTableStats(sparkSession, table, partitionSpec)
Review Comment:
Oh cool
##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -114,7 +114,11 @@ case class InsertIntoHiveTable(
CommandUtils.uncacheTableOrView(sparkSession,
table.identifier.quotedString)
sparkSession.sessionState.catalog.refreshTable(table.identifier)
- CommandUtils.updateTableStats(sparkSession, table)
+ val partitionSpec = partition.map {
+ case (key, Some(null)) => key ->
Some(ExternalCatalogUtils.DEFAULT_PARTITION_NAME)
+ case other => other
+ }
+ CommandUtils.updateTableStats(sparkSession, table, partitionSpec)
Review Comment:
Oh cool
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]