Ngone51 commented on a change in pull request #30433:
URL: https://github.com/apache/spark/pull/30433#discussion_r537982682
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -825,43 +884,68 @@ void resetChunkTracker() {
* @param mapIndex the map index to be added to chunk tracker.
*/
void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
- long idxStartPos = -1;
try {
- // update the chunk tracker to meta file before index file
- writeChunkTracker(mapIndex);
- idxStartPos = indexFile.getFilePointer();
logger.trace("{} shuffleId {} reduceId {} updated index current {}
updated {}",
- appShuffleId.appId, appShuffleId.shuffleId, reduceId,
this.lastChunkOffset,
- chunkOffset);
- indexFile.writeLong(chunkOffset);
- } catch (IOException ioe) {
- if (idxStartPos != -1) {
- // reset the position to avoid corrupting index files during
exception.
- logger.warn("{} shuffleId {} reduceId {} reset index to position {}",
- appShuffleId.appId, appShuffleId.shuffleId, reduceId, idxStartPos);
- indexFile.seek(idxStartPos);
+ appShuffleId.appId, appShuffleId.shuffleId, reduceId,
this.lastChunkOffset, chunkOffset);
+ if (indexMetaUpdateFailed) {
+ indexFile.getChannel().position(indexFile.getPos());
}
+ indexFile.getDos().writeLong(chunkOffset);
+ // Chunk bitmap should be written to the meta file after the index
file because if there are
+ // any exceptions during writing the offset to the index file, meta
file should not be
+ // updated. If the update to the index file is successful but the
update to meta file isn't
+ // then the index file position is reset in the catch clause.
Review comment:
The comment looks out of date. The index file position doesn't reset in
the catch clause now.
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -411,6 +415,269 @@ void deleteExecutorDirs(Path[] dirs) {
}
}
+ @Test
+ public void testRecoverIndexFileAfterIOExceptions() throws IOException {
+ useTestFiles(true, false);
+ RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+ (RemoteBlockPushResolver.PushBlockStreamCallback)
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+ callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
+ callback1.onComplete(callback1.getID());
+ RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo =
callback1.getPartitionInfo();
+ // Close the index stream so it throws IOException
+ TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile)
partitionInfo.getIndexFile();
+ testIndexFile.close();
+ StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+ callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
+ // This will complete without any IOExceptions because number of
IOExceptions are less than
+ // the threshold but the update to index file will be unsuccessful.
+ callback2.onComplete(callback2.getID());
+ assertEquals("index position", 16, testIndexFile.getPos());
+ // Restore the index stream so it can write successfully again.
+ testIndexFile.restore();
+ StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+ callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2]));
+ callback3.onComplete(callback3.getID());
+ assertEquals("index position", 24, testIndexFile.getPos());
+ MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+ new FinalizeShuffleMerge(TEST_APP, 0));
+ validateMergeStatuses(statuses, new int[] {0}, new long[] {11});
+ MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0);
+ validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][]
{{0}, {1, 2}});
+ }
+
+ @Test
+ public void testRecoverIndexFileAfterIOExceptionsInFinalize() throws
IOException {
+ useTestFiles(true, false);
+ registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
+ RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+ (RemoteBlockPushResolver.PushBlockStreamCallback)
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+ callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
+ callback1.onComplete(callback1.getID());
+ RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo =
callback1.getPartitionInfo();
+ // Close the index stream so it throws IOException
+ TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile)
partitionInfo.getIndexFile();
+ testIndexFile.close();
+ StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+ callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
+ // This will complete without any IOExceptions because number of
IOExceptions are less than
+ // the threshold but the update to index file will be unsuccessful.
+ callback2.onComplete(callback2.getID());
+ assertEquals("index position", 16, testIndexFile.getPos());
+ // The last update to index was unsuccessful however any further updates
will be successful.
+ // Restore the index stream so it can write successfully again.
+ testIndexFile.restore();
+ MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+ new FinalizeShuffleMerge(TEST_APP, 0));
+ assertEquals("index position", 24, testIndexFile.getPos());
+ validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
+ MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0);
+ validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][]
{{0}, {1}});
+ }
+
+ @Test
+ public void testRecoverMetaFileAfterIOExceptions() throws IOException {
+ useTestFiles(false, true);
+ RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+ (RemoteBlockPushResolver.PushBlockStreamCallback)
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+ callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
+ callback1.onComplete(callback1.getID());
+ RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo =
callback1.getPartitionInfo();
+ // Close the meta stream so it throws IOException
+ TestMergeShuffleFile testMetaFile = (TestMergeShuffleFile)
partitionInfo.getMetaFile();
+ long metaPosBeforeClose = testMetaFile.getPos();
+ testMetaFile.close();
+ StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+ callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
+ // This will complete without any IOExceptions because number of
IOExceptions are less than
+ // the threshold but the update to index and meta file will be
unsuccessful.
+ callback2.onComplete(callback2.getID());
+ assertEquals("index position", 16, partitionInfo.getIndexFile().getPos());
+ assertEquals("meta position", metaPosBeforeClose, testMetaFile.getPos());
+ // Restore the meta stream so it can write successfully again.
+ testMetaFile.restore();
+ StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+ callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2]));
+ callback3.onComplete(callback3.getID());
+ assertEquals("index position", 24, partitionInfo.getIndexFile().getPos());
+ assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose);
+ MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+ new FinalizeShuffleMerge(TEST_APP, 0));
+ validateMergeStatuses(statuses, new int[] {0}, new long[] {11});
+ MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0);
+ validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][]
{{0}, {1, 2}});
+ }
+
+ @Test
+ public void testRecoverMetaFileAfterIOExceptionsInFinalize() throws
IOException {
+ useTestFiles(false, true);
+ RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+ (RemoteBlockPushResolver.PushBlockStreamCallback)
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+ callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
+ callback1.onComplete(callback1.getID());
+ RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo =
callback1.getPartitionInfo();
+ // Close the meta stream so it throws IOException
+ TestMergeShuffleFile testMetaFile = (TestMergeShuffleFile)
partitionInfo.getMetaFile();
+ long metaPosBeforeClose = testMetaFile.getPos();
+ testMetaFile.close();
+ StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+ callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
+ // This will complete without any IOExceptions because number of
IOExceptions are less than
+ // the threshold but the update to index and meta file will be
unsuccessful.
+ callback2.onComplete(callback2.getID());
+ MergeShuffleFile indexFile = partitionInfo.getIndexFile();
+ assertEquals("index position", 16, indexFile.getPos());
+ assertEquals("meta position", metaPosBeforeClose, testMetaFile.getPos());
+ // Restore the meta stream so it can write successfully again.
+ testMetaFile.restore();
+ MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+ new FinalizeShuffleMerge(TEST_APP, 0));
+ assertEquals("index position", 24, indexFile.getPos());
+ assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose);
+ validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
+ MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0);
+ validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][]
{{0}, {1}});
+ }
+
+ @Test (expected = RuntimeException.class)
+ public void testIOExceptionsExceededThreshold() throws IOException {
+ RemoteBlockPushResolver.PushBlockStreamCallback callback =
+ (RemoteBlockPushResolver.PushBlockStreamCallback)
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+ RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo =
callback.getPartitionInfo();
+ callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
+ callback.onComplete(callback.getID());
+ // Close the data stream so it throws continuous IOException
+ partitionInfo.getDataChannel().close();
+ for (int i = 1; i < 5; i++) {
+ RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+ (RemoteBlockPushResolver.PushBlockStreamCallback)
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, i, 0, 0));
+ try {
+ callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[2]));
+ } catch (IOException ioe) {
+ // this will throw IOException so the client can retry.
+ callback1.onFailure(callback1.getID(), ioe);
+ }
+ }
+ assertEquals(4, partitionInfo.getNumIOExceptions());
+ // After 4 IOException, the server will respond with IOExceptions exceeded
threshold
+ try {
+ RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
+ (RemoteBlockPushResolver.PushBlockStreamCallback)
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, 0, 5, 0, 0));
+ callback2.onData(callback.getID(), ByteBuffer.wrap(new byte[1]));
+ } catch (Throwable t) {
+ assertEquals("IOExceptions exceeded the threshold when merging
shufflePush_0_5_0",
+ t.getMessage());
+ throw t;
+ }
Review comment:
What's the expected behavior for those deferred blocks after exceeding
the IO exception threshold? Seem like we don't any strategy for those blocks
yet.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]