pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842
Hi @otterc I got more information for this issue.
Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
```java
public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
...
for (AppShufflePartitionInfo partition: shuffleMergePartitions.values())
{
synchronized (partition) {
try {
// This can throw IOException which will marks this shuffle
partition as not merged.
partition.finalizePartition();
bitmaps.add(partition.mapTracker);
reduceIds.add(partition.reduceId);
sizes.add(partition.getLastChunkOffset());
} catch (IOException ioe) {
logger.warn("Exception while finalizing shuffle partition {}_{}
{} {}", msg.appId,
msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
} finally {
partition.closeAllFilesAndDeleteIfNeeded(false);
}
}
assert partition.dataFile.length() == partition.lastChunkOffset;
assert partition.indexFile.file.length() ==
partition.indexFile.getPos();
assert partition.metaFile.file.length() ==
partition.metaFile.getPos();
logger.info("shuffle partition {}_{} {} {}, chunk_size={},
meta_length={}, data_length={}",
msg.appId, msg.appAttemptId, msg.shuffleId,
partition.reduceId,
partition.indexFile.getPos() / 8 - 1,
partition.metaFile.getPos(),
partition.lastChunkOffset);
}
mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
Longs.toArray(sizes));
}
...
}
```
```
2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle
partition application_1640143179334_0148_-1 126 4877, chunk_size=1,
meta_length=18, data_length=157
```
Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
```scala
override def getMergedBlockData(
blockId: ShuffleMergedBlockId,
dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
val indexFile =
getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId,
blockId.shuffleMergeId,
blockId.reduceId, dirs)
val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
blockId.shuffleMergeId, blockId.reduceId, dirs)
val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
blockId.shuffleMergeId, blockId.reduceId, dirs)
// Load all the indexes in order to identify all chunks in the specified
merged shuffle file.
val size = indexFile.length.toInt
val offsets = Utils.tryWithResource {
new DataInputStream(Files.newInputStream(indexFile.toPath))
} { dis =>
val buffer = ByteBuffer.allocate(size)
dis.readFully(buffer.array)
buffer.asLongBuffer
}
// Number of chunks is number of indexes - 1
val numChunks = size / 8 - 1
if (numChunks == 0) {
val indexBackupPath =
java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
val dataBackupPath =
java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
val metaBackupPath =
java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
logError(s"$blockId chunk_size is 0, " +
s"index_file is $indexFile, backup to $indexBackupPath" +
s"data_file is $dataFile, backup to $dataBackupPath" +
s"meta_file is $metaFile, backup to $metaBackupPath")
Files.copy(indexFile.toPath, indexBackupPath)
Files.copy(dataFile.toPath, dataBackupPath)
Files.copy(metaFile.toPath, metaBackupPath)
assert(false)
}
for (index <- 0 until numChunks) yield {
new FileSegmentManagedBuffer(transportConf, dataFile,
offsets.get(index),
offsets.get(index + 1) - offsets.get(index))
}
}
```
Then I run TPCDS several rounds and reproduce the exception.
```log
01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61:
ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job
aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most
recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4
executor 562): java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
at
org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
```
```
root@beta-spark4:/tmp# ls -l
shuffleMerged_application_1640143179334_0148_126_0_4877*
-rw-r--r-- 1 root root 16036 Jan 7 19:41
shuffleMerged_application_1640143179334_0148_126_0_4877.data
-rw-r--r-- 1 root root 8 Jan 7 19:41
shuffleMerged_application_1640143179334_0148_126_0_4877.index
-rw-r--r-- 1 root root 0 Jan 7 19:41
shuffleMerged_application_1640143179334_0148_126_0_4877.meta
```
So, the ESS and reduce task running on same machine, and ESS closed the
'data', 'index', 'meta' files and reported the there size as `chunk_size=1,
meta_length=18, data_length=157`, these metadata also return to driver and pass
to reduce task, but when reduce task read the file from disk, the data is not
match!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]