turboFei commented on issue #25863: [WIP][SPARK-29037][CORE][SQL] For static partition overwrite, spark may give duplicate result. URL: https://github.com/apache/spark/pull/25863#issuecomment-533968171 @cloud-fan Sorry, I just updated the PR description. In this PR, we set a unique staging dir for InsertIntoHadoopFsRelation operation and detect the conflict when there are several insetIntoHadoopFsRelation operations execute concurrently. The detail implement: For all InsertIntoHadoopFsReltion operation, we will transfer three parameters into `HadoopMapReduceCommitProtocol`. - isInsertIntoHadoopFsRelation - isOverwrite - staticPartitionKVs We will create a new staging dir related with the parameters above. `.spark-staging-\${jobId}-{overwrite or append}-${staticPartitionKVs.size}` With this new stagingDir, we can detect the operation type, overwrite or append. We will set a subDir under staging dir as output path. Its file tree is `.spark-staging-\${jobId}-{overwrite or append}-${staticPartitionKVs.size}/appId/sp_pk1=v1/sp_pk2=v2/.../sp_pkn=vn` The appId is used to record the appId of InsertIntoHadoopFsRelation and is useful when we detect a conflict. And `${staticPartitionKVs.size}` is used to find a longest path tree with static prefix, and can prevent be confused when there is a partition key started with static prefix. As descripted above, we set a `outputPath` for each insertIntoHadoopFsRelation operation, After `commitJob` we just need invoke mergePaths(fs, outputPath, tablePath) and clean up stagingDir. About detecting conflict: - Firstly, we need get current operation's outputPath, such as `sp_pk1=v1/sp_pk2=v2`. - If current operation is an overwrite operation, we need to find the longest path with static prefix of all staging dirs(include overwrite and append staging dirs ). - If current operation is an append operation, we just need to find that of all overwrite staging dirs. - Judge whether current output path contains other existing outputs or be contained. - ```scala private def pathIsContained(path1: String, path2: String): Boolean = { path1 == "" ||path2 == "" || path1 == path2 || (path1.startsWith(path2) && path1.charAt(path2.length) == '/') || (path2.startsWith(path1) && path2.charAt(path1.length) == '/') } ``` - If exists `pathIsContained`=true, we will throw an exception. - We can get the appId of relative staging and its last modification time. If it run on yarn, wen can get its running duration and judge whether this application is running, if not, we can clean up this staging dir or we can notify that user can clean up it manually.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
