steveloughran commented on a change in pull request #25863:
[SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate
result and support concurrent file source write operations write to different
partitions in the same table.
URL: https://github.com/apache/spark/pull/25863#discussion_r329578757
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -272,3 +351,60 @@ class HadoopMapReduceCommitProtocol(
}
}
}
+
+object HadoopMapReduceCommitProtocol extends Logging {
+
+ /**
+ * Get a path according to specified partition key-value pairs.
+ */
+ def getEscapedStaticPartitionPath(staticPartitionKVs: Iterable[(String,
String)]): String = {
+ staticPartitionKVs.map{kv =>
+ kv._1 + "=" + kv._2
+ }.mkString(File.separator)
+ }
+
+ /**
+ * Delete the staging output path of current InsertIntoHadoopFsRelation
operation. This output
+ * path is used to mark a InsertIntoHadoopFsRelation operation and we can
detect conflict when
+ * there are several operations write same partition or a non-partitioned
table concurrently.
+ *
+ * The output path is a multi level path and is composed of specified
partition key value pairs
+ * formatted `.spark-staging-${depth}/p1=v1/p2=v2/.../pn=vn/appId/jobId`.
When deleting the
+ * staging output path, delete the last level with recursive firstly. Then
try to delete upper
+ * level without recursive, if success, then delete upper level with same
way, until delete the
+ * insertStagingDir.
+ */
+ def deleteStagingInsertOutputPath(
+ fs: FileSystem,
+ insertStagingDir: Path,
+ stagingOutputDir: Path,
+ escapedStaticPartitionKVs: Seq[(String, String)]): Unit = {
+ if (insertStagingDir == null || stagingOutputDir ==null ||
!fs.exists(stagingOutputDir) ||
Review comment:
call fs.getFileStatus and check the status for the directory state,otherwise
you are doing two wrapped getFileStatus calls back to back
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]