attilapiros commented on code in PR #41628:
URL: https://github.com/apache/spark/pull/41628#discussion_r1269537071
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala:
##########
@@ -97,22 +97,53 @@ case class InsertIntoHadoopFsRelationCommand(
var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
var matchingPartitions: Seq[CatalogTablePartition] = Seq.empty
- // When partitions are tracked by the catalog, compute all custom
partition locations that
- // may be relevant to the insertion job.
- if (partitionsTrackedByCatalog) {
- matchingPartitions = sparkSession.sessionState.catalog.listPartitions(
- catalogTable.get.identifier, Some(staticPartitions))
- initialMatchingPartitions = matchingPartitions.map(_.spec)
- customPartitionLocations = getCustomPartitionLocations(
- fs, catalogTable.get, qualifiedOutputPath, matchingPartitions)
- }
-
val jobId = java.util.UUID.randomUUID().toString
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = jobId,
outputPath = outputPath.toString,
dynamicPartitionOverwrite = dynamicPartitionOverwrite)
+ // For dynamic partition overwrite, FileOutputCommitter's output path is
staging path, files
+ // will be renamed from staging path to final output path during commit job
+ val committerOutputPath = if (dynamicPartitionOverwrite) {
+ FileCommitProtocol.getStagingDir(outputPath.toString, jobId)
+ .makeQualified(fs.getUri, fs.getWorkingDirectory)
+ } else {
+ qualifiedOutputPath
+ }
+ var updatedPartitionPaths: Set[String] = Set.empty
+
+ // When partitions are tracked by the catalog, compute all custom
partition locations that
+ // may be relevant to the insertion job.
+ if (partitionsTrackedByCatalog) {
+ matchingPartitions = if (dynamicPartitionOverwrite) {
+ // Get the matching partitions from the written path instead of pull
all
+ // partitions from metastore, avoiding heavy pressures
+ updatedPartitionPaths = FileFormatWriter.write(
+ sparkSession = sparkSession,
+ plan = child,
+ fileFormat = fileFormat,
+ committer = committer,
+ outputSpec = FileFormatWriter.OutputSpec(
+ committerOutputPath.toString,
+ customPartitionLocations,
Review Comment:
Instead of `customPartitionLocations` you can use the `Map.empty` directly
for making the difference obvious that custom partitions are not used here.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala:
##########
@@ -275,6 +309,28 @@ case class InsertIntoHadoopFsRelationCommand(
}.toMap
}
+ /**
+ * Deletes all partition files that match the custom locations and copy
files from the dynamic
+ * writing paths
+ */
+ private def overwriteCustomPartitions(
+ fs: FileSystem,
+ table: CatalogTable,
+ qualifiedOutputPath: Path,
+ committer: FileCommitProtocol,
+ customPartitions: Map[TablePartitionSpec, String]) = {
+ customPartitions.foreach { case (spec, customLoc) =>
+ val defaultLocation = qualifiedOutputPath.suffix(
+ "/" + PartitioningUtils.getPathFragment(spec, table.partitionSchema))
+
+ val catalogLocation = new Path(customLoc).makeQualified(fs.getUri,
fs.getWorkingDirectory)
+ if (fs.exists(catalogLocation) && !committer.deleteWithJob(fs,
catalogLocation, true)) {
+ throw
QueryExecutionErrors.cannotClearPartitionDirectoryError(catalogLocation)
+ }
+ fs.rename(defaultLocation, catalogLocation)
+ }
+ }
+
override protected def withNewChildInternal(
- newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query =
newChild)
+ newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query =
newChild)
Review Comment:
Nit: revert this line
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala:
##########
@@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand(
catalogTable: Option[CatalogTable],
fileIndex: Option[FileIndex],
outputColumnNames: Seq[String])
- extends V1WriteCommand {
+ extends V1WriteCommand {
Review Comment:
Nit: revert this line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]