steveloughran commented on a change in pull request #25863: 
[SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate 
result and support concurrent file source write operations write to different 
partitions in the same table.
URL: https://github.com/apache/spark/pull/25863#discussion_r329578757
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##########
 @@ -272,3 +351,60 @@ class HadoopMapReduceCommitProtocol(
     }
   }
 }
+
+object  HadoopMapReduceCommitProtocol extends Logging {
+
+  /**
+   * Get a path according to specified partition key-value pairs.
+   */
+  def getEscapedStaticPartitionPath(staticPartitionKVs: Iterable[(String, 
String)]): String = {
+    staticPartitionKVs.map{kv =>
+      kv._1 + "=" + kv._2
+    }.mkString(File.separator)
+  }
+
+  /**
+   * Delete the staging output path of current InsertIntoHadoopFsRelation 
operation. This output
+   * path is used to mark a InsertIntoHadoopFsRelation operation and we can 
detect conflict when
+   * there are several operations write same partition or a non-partitioned 
table concurrently.
+   *
+   * The output path is a multi level path and is composed of specified 
partition key value pairs
+   * formatted `.spark-staging-${depth}/p1=v1/p2=v2/.../pn=vn/appId/jobId`. 
When deleting the
+   * staging output path, delete the last level with recursive firstly. Then 
try to delete upper
+   * level without recursive, if success, then delete upper level with same 
way, until delete the
+   * insertStagingDir.
+   */
+   def deleteStagingInsertOutputPath(
+       fs: FileSystem,
+       insertStagingDir: Path,
+       stagingOutputDir: Path,
+       escapedStaticPartitionKVs: Seq[(String, String)]): Unit = {
+     if (insertStagingDir == null || stagingOutputDir ==null || 
!fs.exists(stagingOutputDir) ||
 
 Review comment:
   call fs.getFileStatus and check the status for the directory state,otherwise 
you are doing two wrapped getFileStatus calls back to back

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to