gczsjdy commented on a change in pull request #25007: 
[SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API 
URL: https://github.com/apache/spark/pull/25007#discussion_r306602361
 
 

 ##########
 File path: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 ##########
 @@ -99,74 +104,82 @@
 
   BypassMergeSortShuffleWriter(
       BlockManager blockManager,
-      IndexShuffleBlockResolver shuffleBlockResolver,
       BypassMergeSortShuffleHandle<K, V> handle,
       int mapId,
+      long mapTaskAttemptId,
       SparkConf conf,
-      ShuffleWriteMetricsReporter writeMetrics) {
+      ShuffleWriteMetricsReporter writeMetrics,
+      ShuffleWriteSupport shuffleWriteSupport) {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
     this.fileBufferSize = (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
     this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
     this.blockManager = blockManager;
     final ShuffleDependency<K, V, V> dep = handle.dependency();
     this.mapId = mapId;
+    this.mapTaskAttemptId = mapTaskAttemptId;
     this.shuffleId = dep.shuffleId();
     this.partitioner = dep.partitioner();
     this.numPartitions = partitioner.numPartitions();
     this.writeMetrics = writeMetrics;
     this.serializer = dep.serializer();
-    this.shuffleBlockResolver = shuffleBlockResolver;
+    this.shuffleWriteSupport = shuffleWriteSupport;
   }
 
   @Override
   public void write(Iterator<Product2<K, V>> records) throws IOException {
     assert (partitionWriters == null);
-    if (!records.hasNext()) {
-      partitionLengths = new long[numPartitions];
-      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, null);
-      mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
-      return;
-    }
-    final SerializerInstance serInstance = serializer.newInstance();
-    final long openStartTime = System.nanoTime();
-    partitionWriters = new DiskBlockObjectWriter[numPartitions];
-    partitionWriterSegments = new FileSegment[numPartitions];
-    for (int i = 0; i < numPartitions; i++) {
-      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
-        blockManager.diskBlockManager().createTempShuffleBlock();
-      final File file = tempShuffleBlockIdPlusFile._2();
-      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
-      partitionWriters[i] =
-        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, 
writeMetrics);
-    }
-    // Creating the file to write to and creating a disk writer both involve 
interacting with
-    // the disk, and can take a long time in aggregate when we open many 
files, so should be
-    // included in the shuffle write time.
-    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
-
-    while (records.hasNext()) {
-      final Product2<K, V> record = records.next();
-      final K key = record._1();
-      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
-    }
+    ShuffleMapOutputWriter mapOutputWriter = shuffleWriteSupport
+        .createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, 
numPartitions, writeMetrics);
+    try {
+      if (!records.hasNext()) {
+        partitionLengths = new long[numPartitions];
+        mapOutputWriter.commitAllPartitions();
+        mapStatus = MapStatus$.MODULE$.apply(
+            blockManager.shuffleServerId(),
+            partitionLengths);
+        return;
+      }
+      final SerializerInstance serInstance = serializer.newInstance();
+      final long openStartTime = System.nanoTime();
+      partitionWriters = new DiskBlockObjectWriter[numPartitions];
+      partitionWriterSegments = new FileSegment[numPartitions];
+      for (int i = 0; i < numPartitions; i++) {
+        final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
+            blockManager.diskBlockManager().createTempShuffleBlock();
+        final File file = tempShuffleBlockIdPlusFile._2();
+        final BlockId blockId = tempShuffleBlockIdPlusFile._1();
+        partitionWriters[i] =
+            blockManager.getDiskWriter(blockId, file, serInstance, 
fileBufferSize, writeMetrics);
+      }
+      // Creating the file to write to and creating a disk writer both involve 
interacting with
+      // the disk, and can take a long time in aggregate when we open many 
files, so should be
+      // included in the shuffle write time.
+      writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
 
-    for (int i = 0; i < numPartitions; i++) {
-      try (DiskBlockObjectWriter writer = partitionWriters[i]) {
-        partitionWriterSegments[i] = writer.commitAndGet();
+      while (records.hasNext()) {
+        final Product2<K, V> record = records.next();
+        final K key = record._1();
+        partitionWriters[partitioner.getPartition(key)].write(key, 
record._2());
       }
-    }
 
-    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
-    File tmp = Utils.tempFileWith(output);
-    try {
-      partitionLengths = writePartitionedFile(tmp);
-      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
-    } finally {
-      if (tmp.exists() && !tmp.delete()) {
-        logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
+      for (int i = 0; i < numPartitions; i++) {
+        try (DiskBlockObjectWriter writer = partitionWriters[i]) {
+          partitionWriterSegments[i] = writer.commitAndGet();
+        }
+      }
+
+      partitionLengths = writePartitionedData(mapOutputWriter);
+      mapOutputWriter.commitAllPartitions();
 
 Review comment:
   Thanks, it makes sense!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to