gczsjdy commented on a change in pull request #25007:
URL: https://github.com/apache/spark/pull/25007#discussion_r427875960



##########
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
##########
@@ -99,74 +105,82 @@
 
   BypassMergeSortShuffleWriter(
       BlockManager blockManager,
-      IndexShuffleBlockResolver shuffleBlockResolver,
       BypassMergeSortShuffleHandle<K, V> handle,
       int mapId,
+      long mapTaskAttemptId,
       SparkConf conf,
-      ShuffleWriteMetricsReporter writeMetrics) {
+      ShuffleWriteMetricsReporter writeMetrics,
+      ShuffleExecutorComponents shuffleExecutorComponents) {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
     this.fileBufferSize = (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
     this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
     this.blockManager = blockManager;
     final ShuffleDependency<K, V, V> dep = handle.dependency();
     this.mapId = mapId;
+    this.mapTaskAttemptId = mapTaskAttemptId;
     this.shuffleId = dep.shuffleId();
     this.partitioner = dep.partitioner();
     this.numPartitions = partitioner.numPartitions();
     this.writeMetrics = writeMetrics;
     this.serializer = dep.serializer();
-    this.shuffleBlockResolver = shuffleBlockResolver;
+    this.shuffleExecutorComponents = shuffleExecutorComponents;
   }
 
   @Override
   public void write(Iterator<Product2<K, V>> records) throws IOException {
     assert (partitionWriters == null);
-    if (!records.hasNext()) {
-      partitionLengths = new long[numPartitions];
-      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, null);
-      mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
-      return;
-    }
-    final SerializerInstance serInstance = serializer.newInstance();
-    final long openStartTime = System.nanoTime();
-    partitionWriters = new DiskBlockObjectWriter[numPartitions];
-    partitionWriterSegments = new FileSegment[numPartitions];
-    for (int i = 0; i < numPartitions; i++) {
-      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
-        blockManager.diskBlockManager().createTempShuffleBlock();
-      final File file = tempShuffleBlockIdPlusFile._2();
-      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
-      partitionWriters[i] =
-        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, 
writeMetrics);
-    }
-    // Creating the file to write to and creating a disk writer both involve 
interacting with
-    // the disk, and can take a long time in aggregate when we open many 
files, so should be
-    // included in the shuffle write time.
-    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
-
-    while (records.hasNext()) {
-      final Product2<K, V> record = records.next();
-      final K key = record._1();
-      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
-    }
+    ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
+        .createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, 
numPartitions);
+    try {
+      if (!records.hasNext()) {
+        partitionLengths = new long[numPartitions];
+        mapOutputWriter.commitAllPartitions();
+        mapStatus = MapStatus$.MODULE$.apply(
+            blockManager.shuffleServerId(),
+            partitionLengths);
+        return;
+      }
+      final SerializerInstance serInstance = serializer.newInstance();
+      final long openStartTime = System.nanoTime();
+      partitionWriters = new DiskBlockObjectWriter[numPartitions];
+      partitionWriterSegments = new FileSegment[numPartitions];
+      for (int i = 0; i < numPartitions; i++) {
+        final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
+            blockManager.diskBlockManager().createTempShuffleBlock();
+        final File file = tempShuffleBlockIdPlusFile._2();
+        final BlockId blockId = tempShuffleBlockIdPlusFile._1();
+        partitionWriters[i] =
+            blockManager.getDiskWriter(blockId, file, serInstance, 
fileBufferSize, writeMetrics);

Review comment:
       @mccheah  Sorry to bring up such an old PR lol.
   But why didn't we make this taken care of by specific plugin? This is not 
spill.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to