otterc commented on a change in pull request #30433:
URL: https://github.com/apache/spark/pull/30433#discussion_r529837473
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -827,13 +833,16 @@ void resetChunkTracker() {
void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
long idxStartPos = -1;
try {
- // update the chunk tracker to meta file before index file
- writeChunkTracker(mapIndex);
idxStartPos = indexFile.getFilePointer();
logger.trace("{} shuffleId {} reduceId {} updated index current {}
updated {}",
appShuffleId.appId, appShuffleId.shuffleId, reduceId,
this.lastChunkOffset,
chunkOffset);
- indexFile.writeLong(chunkOffset);
+ indexFile.write(Longs.toByteArray(chunkOffset));
+ // Chunk bitmap should be written to the meta file after the index
file because if there are
+ // any exceptions during writing the offset to the index file, meta
file should not be
+ // updated. If the update to the index file is successful but the
update to meta file isn't
+ // then the index file position is reset in the catch clause.
+ writeChunkTracker(mapIndex);
Review comment:
> Besides, I'm thinking it would be good if we could dynamically change
the merger location when such IO exception happens.
Which one of these do you mean?
1. The executor should start pushing to a different server for the current
shuffle that is going on.
2. Do you mean that for a future shuffle, the driver should decide the
merger locations considering these IOExceptions.
(1) is currently difficult. The driver decides the specific merger locations
before a shuffle stage and all the executors work on the assumption that these
are all the shuffle servers that they push the data to. This list of shuffle
servers should be consistent across the shuffle servers because if it's not
then they will be push the shuffle data belonging to a reducer to different
shuffle servers. This is the code I am talking about-
[code](https://github.com/linkedin/spark/blob/002cb69e8ddf49bbe114744b84c46e0fd452d852/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala#L360).
(2) This could be a future improvement where we also include the metrics for
merge reported back to driver when the driver triggers the finalize. The driver
could use these metrics when deciding which shuffle server to use for the next
push.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]