turboFei commented on issue #25863: [WIP][SPARK-29037][CORE][SQL] For static 
partition overwrite, spark may give duplicate result.
URL: https://github.com/apache/spark/pull/25863#issuecomment-533968171
 
 
   @cloud-fan 
   Sorry, I just updated the PR description. 
   
   In this PR, we set a unique staging dir for InsertIntoHadoopFsRelation 
operation and detect the conflict when there are several 
insetIntoHadoopFsRelation operations execute concurrently.
   
   The detail implement:
   
   For all InsertIntoHadoopFsReltion operation,  we will transfer three 
parameters into `HadoopMapReduceCommitProtocol`.
   
   - isInsertIntoHadoopFsRelation
   - isOverwrite
   - staticPartitionKVs
   
   We will create a new staging dir related with the parameters above.
   
   `.spark-staging-\${jobId}-{overwrite or append}-${staticPartitionKVs.size}`
   
   With this new stagingDir, we can detect the operation type, overwrite or 
append.
   
   We will set a subDir under staging dir as output path.
   
   Its file tree is `.spark-staging-\${jobId}-{overwrite or 
append}-${staticPartitionKVs.size}/appId/sp_pk1=v1/sp_pk2=v2/.../sp_pkn=vn`
   
   The appId is used to record the appId of InsertIntoHadoopFsRelation and is 
useful when we detect a conflict.
   
   And `${staticPartitionKVs.size}` is used to find a longest path tree with 
static prefix, and can prevent be confused when there is a partition key 
started with static prefix.
   
   
   
   As descripted above, we set a `outputPath` for each 
insertIntoHadoopFsRelation operation, 
   
   After `commitJob` we just need invoke mergePaths(fs, outputPath, tablePath)  
and clean up stagingDir.
   
   
   
   About detecting conflict:
   
   - Firstly, we need get current operation's outputPath, such as 
`sp_pk1=v1/sp_pk2=v2`.
   
   - If current operation is an overwrite operation, we need to find the 
longest path with static prefix of all staging dirs(include overwrite and 
append staging dirs ).
   
   - If current operation is an append operation, we just need to find that of 
all overwrite staging dirs.
   
   - Judge whether current output path contains other existing outputs or be 
contained.
   
   - ```scala
     private def pathIsContained(path1: String, path2: String): Boolean = {
       path1 == "" ||path2 == "" || path1 == path2 ||
         (path1.startsWith(path2) && path1.charAt(path2.length) == '/') ||
         (path2.startsWith(path1) && path2.charAt(path1.length) == '/')
     }
     ```
   
   - If  exists `pathIsContained`=true, we will throw an exception.
   
   - We can get the appId of relative staging and its last modification time. 
If it run on yarn, wen can get its running duration and judge whether this 
application is running, if not, we can clean up this staging dir or we can 
notify that user can clean up it manually.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to