mridulm commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r627646535
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,29 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int,
reduceId: Int) exte
override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex +
"_" + reduceId
}
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int)
extends BlockId {
+ override def name: String = "mergedShuffle_" + appId + "_" + shuffleId + "_"
+ reduceId + ".data"
+}
+
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+ appId: String,
+ shuffleId: Int,
+ reduceId: Int) extends BlockId {
+ override def name: String =
+ "mergedShuffle_" + appId + "_" + shuffleId + "_" + reduceId + ".index"
Review comment:
To add to my comment above (should have provided more context): I was
assuming we are relying on directory existence to infer latest.
But I like @Ngone51's idea here better:
> And if the executor becomes the one who creates the merge dir, we send
the ExecutorShuffleInfo with the special shuffleManager, e.g.,
"sort_merge_manager_attemptX". And ExternalBlockHandler can parse the
shuffleManager into two parts
`shuffleManager` should to be relevant when we had multiple shuffle managers
- and from evolution point of view, we could always introduce a new shuffle in
future.
My proposal is extension of the idea above - here, attempt is metadata about
shuffleManager we want to convey.
Ideally, this should be within `ExecutorShuffleInfo` - but given the
compatibility issues here, why not add a general way to encode metadata about
the shuffle manager here ?
That is, instead of hardcoding 'SortShuffleManager_attemptX' and
'SortShuffleManager' as supported patterns, we could simply allow for
'SortShuffleManager' (existing) and 'SortShuffleManager:json_string' ?
This can allow for any potential future evolution to also be possible - with
currently `{"merge_dir": "merge_directory_<attemptid>",
"attempt_id":<attempt_id>}` being the initial field (@zhouyejoe let us
explicitly pass the directory name, instead of inferring it from attempt id ?)
Ofcourse if ':' is missing in shuffleManager, then we treat it as empty
metadata case.
With this in place, we would still need changes to push block protocol to
include attempt id - but given no one is using that currently, we can make that
change.
With this explicit specification of merge directory, we remove all ambiguity.
Thoughts @Ngone51, @otterc, @zhouyejoe ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]