yikf commented on a change in pull request #31664:
URL: https://github.com/apache/spark/pull/31664#discussion_r584438668
##########
File path:
core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
##########
@@ -131,4 +139,29 @@ class SortShuffleManagerSuite extends SparkFunSuite with
Matchers {
)))
}
+ test("Data could not be cleaned up when unregisterShuffle") {
Review comment:
OK, thank you. I'll add test later, And I don't understand why the
protocol should be differentiated on the WriteSide, As follow:
`ShuffleMapTask#runTask`
`
// While we use the old shuffle fetch protocol, we use partitionId as
mapId in the ShuffleBlockId construction.
val mapId = if
(SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
partitionId
} else context.taskAttemptId()
`
In readSide, we need to use protoco to distinguish messages, But in
writeSide, register to ExternalShuffleService by `RegisterExecutor `, It paas
the localDir to shuffleService, So shuffleService know the middle file by
shuffle, But seems unrelated to `mapId`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]