mccheah commented on a change in pull request #25007:
[SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API
URL: https://github.com/apache/spark/pull/25007#discussion_r306562340
##########
File path:
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
##########
@@ -99,74 +104,82 @@
BypassMergeSortShuffleWriter(
BlockManager blockManager,
- IndexShuffleBlockResolver shuffleBlockResolver,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
+ long mapTaskAttemptId,
SparkConf conf,
- ShuffleWriteMetricsReporter writeMetrics) {
+ ShuffleWriteMetricsReporter writeMetrics,
+ ShuffleWriteSupport shuffleWriteSupport) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no
units are provided
this.fileBufferSize = (int) (long)
conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
+ this.mapTaskAttemptId = mapTaskAttemptId;
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = writeMetrics;
this.serializer = dep.serializer();
- this.shuffleBlockResolver = shuffleBlockResolver;
+ this.shuffleWriteSupport = shuffleWriteSupport;
}
@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
- if (!records.hasNext()) {
- partitionLengths = new long[numPartitions];
- shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId,
partitionLengths, null);
- mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(),
partitionLengths);
- return;
- }
- final SerializerInstance serInstance = serializer.newInstance();
- final long openStartTime = System.nanoTime();
- partitionWriters = new DiskBlockObjectWriter[numPartitions];
- partitionWriterSegments = new FileSegment[numPartitions];
- for (int i = 0; i < numPartitions; i++) {
- final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
- blockManager.diskBlockManager().createTempShuffleBlock();
- final File file = tempShuffleBlockIdPlusFile._2();
- final BlockId blockId = tempShuffleBlockIdPlusFile._1();
- partitionWriters[i] =
- blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize,
writeMetrics);
- }
- // Creating the file to write to and creating a disk writer both involve
interacting with
- // the disk, and can take a long time in aggregate when we open many
files, so should be
- // included in the shuffle write time.
- writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
-
- while (records.hasNext()) {
- final Product2<K, V> record = records.next();
- final K key = record._1();
- partitionWriters[partitioner.getPartition(key)].write(key, record._2());
- }
+ ShuffleMapOutputWriter mapOutputWriter = shuffleWriteSupport
+ .createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId,
numPartitions, writeMetrics);
+ try {
+ if (!records.hasNext()) {
+ partitionLengths = new long[numPartitions];
+ mapOutputWriter.commitAllPartitions();
+ mapStatus = MapStatus$.MODULE$.apply(
+ blockManager.shuffleServerId(),
+ partitionLengths);
+ return;
+ }
+ final SerializerInstance serInstance = serializer.newInstance();
+ final long openStartTime = System.nanoTime();
+ partitionWriters = new DiskBlockObjectWriter[numPartitions];
+ partitionWriterSegments = new FileSegment[numPartitions];
+ for (int i = 0; i < numPartitions; i++) {
+ final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
+ blockManager.diskBlockManager().createTempShuffleBlock();
+ final File file = tempShuffleBlockIdPlusFile._2();
+ final BlockId blockId = tempShuffleBlockIdPlusFile._1();
+ partitionWriters[i] =
+ blockManager.getDiskWriter(blockId, file, serInstance,
fileBufferSize, writeMetrics);
+ }
+ // Creating the file to write to and creating a disk writer both involve
interacting with
+ // the disk, and can take a long time in aggregate when we open many
files, so should be
+ // included in the shuffle write time.
+ writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
- for (int i = 0; i < numPartitions; i++) {
- try (DiskBlockObjectWriter writer = partitionWriters[i]) {
- partitionWriterSegments[i] = writer.commitAndGet();
+ while (records.hasNext()) {
+ final Product2<K, V> record = records.next();
+ final K key = record._1();
+ partitionWriters[partitioner.getPartition(key)].write(key,
record._2());
}
- }
- File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
- File tmp = Utils.tempFileWith(output);
- try {
- partitionLengths = writePartitionedFile(tmp);
- shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId,
partitionLengths, tmp);
- } finally {
- if (tmp.exists() && !tmp.delete()) {
- logger.error("Error while deleting temp file {}",
tmp.getAbsolutePath());
+ for (int i = 0; i < numPartitions; i++) {
+ try (DiskBlockObjectWriter writer = partitionWriters[i]) {
+ partitionWriterSegments[i] = writer.commitAndGet();
+ }
+ }
+
+ partitionLengths = writePartitionedData(mapOutputWriter);
+ mapOutputWriter.commitAllPartitions();
Review comment:
Ah but the problem is that we commit the partitions immediately if there are
no records (line 136) so we have to open it first thing. So the scope of the
try...catch is correct because if anything fails after opening the writer, the
writer must be aborted.
The writer doesn't have to write any records to have intermediate state to
clean up. For example, it could have opened an output stream to the destination
spot, or created folders.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]