otterc commented on a change in pull request #30433:
URL: https://github.com/apache/spark/pull/30433#discussion_r536347122
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -825,45 +881,79 @@ void resetChunkTracker() {
* @param mapIndex the map index to be added to chunk tracker.
*/
void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
- long idxStartPos = -1;
try {
- // update the chunk tracker to meta file before index file
- writeChunkTracker(mapIndex);
- idxStartPos = indexFile.getFilePointer();
logger.trace("{} shuffleId {} reduceId {} updated index current {}
updated {}",
- appShuffleId.appId, appShuffleId.shuffleId, reduceId,
this.lastChunkOffset,
- chunkOffset);
- indexFile.writeLong(chunkOffset);
- } catch (IOException ioe) {
- if (idxStartPos != -1) {
- // reset the position to avoid corrupting index files during
exception.
- logger.warn("{} shuffleId {} reduceId {} reset index to position {}",
- appShuffleId.appId, appShuffleId.shuffleId, reduceId, idxStartPos);
- indexFile.seek(idxStartPos);
+ appShuffleId.appId, appShuffleId.shuffleId, reduceId,
this.lastChunkOffset, chunkOffset);
+ if (indexMetaUpdateFailed) {
+ indexFile.getChannel().position(indexFile.getPos());
}
+ indexFile.getDos().writeLong(chunkOffset);
+ // Chunk bitmap should be written to the meta file after the index
file because if there are
+ // any exceptions during writing the offset to the index file, meta
file should not be
+ // updated. If the update to the index file is successful but the
update to meta file isn't
+ // then the index file position is reset in the catch clause.
+ writeChunkTracker(mapIndex);
+ indexFile.updatePos(8);
+ this.lastChunkOffset = chunkOffset;
+ indexMetaUpdateFailed = false;
+ } catch (IOException ioe) {
+ logger.warn("{} shuffleId {} reduceId {} update to index/meta failed",
appShuffleId.appId,
+ appShuffleId.shuffleId, reduceId);
+ indexMetaUpdateFailed = true;
+ // Any exception here is propagated to the caller and the caller can
decide whether to
+ // abort or not.
throw ioe;
}
- this.lastChunkOffset = chunkOffset;
}
private void writeChunkTracker(int mapIndex) throws IOException {
if (mapIndex == -1) {
return;
}
chunkTracker.add(mapIndex);
- long metaStartPos = metaFile.getFilePointer();
try {
logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to
meta file",
appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
- chunkTracker.serialize(metaFile);
+ if (indexMetaUpdateFailed) {
+ metaFile.getChannel().position(metaFile.getPos());
+ }
+ int chunkTrackerSize = chunkTracker.serializedSizeInBytes();
+ chunkTracker.serialize(metaFile.getDos());
+ metaFile.updatePos(chunkTrackerSize);
} catch (IOException ioe) {
- logger.warn("{} shuffleId {} reduceId {} mapIndex {} reset position of
meta file to {}",
- appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex,
metaStartPos);
- metaFile.seek(metaStartPos);
+ logger.warn("{} shuffleId {} reduceId {} mapIndex {} update to meta
file failed",
Review comment:
done
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -825,45 +881,79 @@ void resetChunkTracker() {
* @param mapIndex the map index to be added to chunk tracker.
*/
void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
- long idxStartPos = -1;
try {
- // update the chunk tracker to meta file before index file
- writeChunkTracker(mapIndex);
- idxStartPos = indexFile.getFilePointer();
logger.trace("{} shuffleId {} reduceId {} updated index current {}
updated {}",
- appShuffleId.appId, appShuffleId.shuffleId, reduceId,
this.lastChunkOffset,
- chunkOffset);
- indexFile.writeLong(chunkOffset);
- } catch (IOException ioe) {
- if (idxStartPos != -1) {
- // reset the position to avoid corrupting index files during
exception.
- logger.warn("{} shuffleId {} reduceId {} reset index to position {}",
- appShuffleId.appId, appShuffleId.shuffleId, reduceId, idxStartPos);
- indexFile.seek(idxStartPos);
+ appShuffleId.appId, appShuffleId.shuffleId, reduceId,
this.lastChunkOffset, chunkOffset);
+ if (indexMetaUpdateFailed) {
+ indexFile.getChannel().position(indexFile.getPos());
}
+ indexFile.getDos().writeLong(chunkOffset);
+ // Chunk bitmap should be written to the meta file after the index
file because if there are
+ // any exceptions during writing the offset to the index file, meta
file should not be
+ // updated. If the update to the index file is successful but the
update to meta file isn't
+ // then the index file position is reset in the catch clause.
+ writeChunkTracker(mapIndex);
+ indexFile.updatePos(8);
+ this.lastChunkOffset = chunkOffset;
+ indexMetaUpdateFailed = false;
+ } catch (IOException ioe) {
+ logger.warn("{} shuffleId {} reduceId {} update to index/meta failed",
appShuffleId.appId,
+ appShuffleId.shuffleId, reduceId);
+ indexMetaUpdateFailed = true;
+ // Any exception here is propagated to the caller and the caller can
decide whether to
+ // abort or not.
throw ioe;
}
- this.lastChunkOffset = chunkOffset;
}
private void writeChunkTracker(int mapIndex) throws IOException {
if (mapIndex == -1) {
return;
}
chunkTracker.add(mapIndex);
- long metaStartPos = metaFile.getFilePointer();
try {
logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to
meta file",
appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
- chunkTracker.serialize(metaFile);
+ if (indexMetaUpdateFailed) {
+ metaFile.getChannel().position(metaFile.getPos());
+ }
+ int chunkTrackerSize = chunkTracker.serializedSizeInBytes();
+ chunkTracker.serialize(metaFile.getDos());
+ metaFile.updatePos(chunkTrackerSize);
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]