attilapiros commented on code in PR #41628:
URL: https://github.com/apache/spark/pull/41628#discussion_r1269537071


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala:
##########
@@ -97,22 +97,53 @@ case class InsertIntoHadoopFsRelationCommand(
     var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
     var matchingPartitions: Seq[CatalogTablePartition] = Seq.empty
 
-    // When partitions are tracked by the catalog, compute all custom 
partition locations that
-    // may be relevant to the insertion job.
-    if (partitionsTrackedByCatalog) {
-      matchingPartitions = sparkSession.sessionState.catalog.listPartitions(
-        catalogTable.get.identifier, Some(staticPartitions))
-      initialMatchingPartitions = matchingPartitions.map(_.spec)
-      customPartitionLocations = getCustomPartitionLocations(
-        fs, catalogTable.get, qualifiedOutputPath, matchingPartitions)
-    }
-
     val jobId = java.util.UUID.randomUUID().toString
     val committer = FileCommitProtocol.instantiate(
       sparkSession.sessionState.conf.fileCommitProtocolClass,
       jobId = jobId,
       outputPath = outputPath.toString,
       dynamicPartitionOverwrite = dynamicPartitionOverwrite)
+    // For dynamic partition overwrite, FileOutputCommitter's output path is 
staging path, files
+    // will be renamed from staging path to final output path during commit job
+    val committerOutputPath = if (dynamicPartitionOverwrite) {
+      FileCommitProtocol.getStagingDir(outputPath.toString, jobId)
+        .makeQualified(fs.getUri, fs.getWorkingDirectory)
+    } else {
+      qualifiedOutputPath
+    }
+    var updatedPartitionPaths: Set[String] = Set.empty
+
+    // When partitions are tracked by the catalog, compute all custom 
partition locations that
+    // may be relevant to the insertion job.
+    if (partitionsTrackedByCatalog) {
+      matchingPartitions = if (dynamicPartitionOverwrite) {
+        // Get the matching partitions from the written path instead of pull 
all
+        // partitions from metastore, avoiding heavy pressures
+        updatedPartitionPaths = FileFormatWriter.write(
+          sparkSession = sparkSession,
+          plan = child,
+          fileFormat = fileFormat,
+          committer = committer,
+          outputSpec = FileFormatWriter.OutputSpec(
+            committerOutputPath.toString,
+            customPartitionLocations,

Review Comment:
   Instead  of `customPartitionLocations` you can use the `Map.empty` directly 
for making the difference obvious that custom partitions are not used here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala:
##########
@@ -275,6 +309,28 @@ case class InsertIntoHadoopFsRelationCommand(
     }.toMap
   }
 
+  /**
+   * Deletes all partition files that match the custom locations and copy 
files from the dynamic
+   * writing paths
+   */
+  private def overwriteCustomPartitions(
+      fs: FileSystem,
+      table: CatalogTable,
+      qualifiedOutputPath: Path,
+      committer: FileCommitProtocol,
+      customPartitions: Map[TablePartitionSpec, String]) = {
+    customPartitions.foreach { case (spec, customLoc) =>
+      val defaultLocation = qualifiedOutputPath.suffix(
+        "/" + PartitioningUtils.getPathFragment(spec, table.partitionSchema))
+
+      val catalogLocation = new Path(customLoc).makeQualified(fs.getUri, 
fs.getWorkingDirectory)
+      if (fs.exists(catalogLocation) && !committer.deleteWithJob(fs, 
catalogLocation, true)) {
+        throw 
QueryExecutionErrors.cannotClearPartitionDirectoryError(catalogLocation)
+      }
+      fs.rename(defaultLocation, catalogLocation)
+    }
+  }
+
   override protected def withNewChildInternal(
-    newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query = 
newChild)
+      newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query = 
newChild)

Review Comment:
   Nit: revert this line



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala:
##########
@@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand(
     catalogTable: Option[CatalogTable],
     fileIndex: Option[FileIndex],
     outputColumnNames: Seq[String])
-  extends V1WriteCommand {
+    extends V1WriteCommand {

Review Comment:
   Nit: revert this line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to