vanzin commented on a change in pull request #25342:
[SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the
SortShuffleWriter
URL: https://github.com/apache/spark/pull/25342#discussion_r318304015
##########
File path:
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
##########
@@ -718,6 +718,79 @@ private[spark] class ExternalSorter[K, V, C](
lengths
}
+ /**
+ * Write all the data added into this ExternalSorter into a map output
writer that pushes bytes
+ * to some arbitrary backing store. This is called by the SortShuffleWriter.
+ *
+ * @return array of lengths, in bytes, of each partition of the file (used
by map output tracker)
+ */
+ def writePartitionedMapOutput(
+ shuffleId: Int,
+ mapId: Int,
+ mapOutputWriter: ShuffleMapOutputWriter): Unit = {
+ var nextPartitionId = 0
+ if (spills.isEmpty) {
+ // Case where we only have in-memory data
+ val collection = if (aggregator.isDefined) map else buffer
+ val it =
collection.destructiveSortedWritablePartitionedIterator(comparator)
+ while (it.hasNext()) {
+ val partitionId = it.nextPartition()
+ var partitionWriter: ShufflePartitionWriter = null
+ var partitionPairsWriter: ShufflePartitionPairsWriter = null
+ var threwException = true
+ try {
+ partitionWriter = mapOutputWriter.getPartitionWriter(partitionId)
+ val blockId = ShuffleBlockId(shuffleId, mapId, partitionId)
+ partitionPairsWriter = new ShufflePartitionPairsWriter(
+ partitionWriter,
+ serializerManager,
+ serInstance,
+ blockId,
+ context.taskMetrics().shuffleWriteMetrics)
+ while (it.hasNext && it.nextPartition() == partitionId) {
+ it.writeNext(partitionPairsWriter)
+ }
+ threwException = false
+ } finally {
+ if (partitionPairsWriter != null) {
+ Closeables.close(partitionPairsWriter, threwException)
+ }
+ }
+ nextPartitionId = partitionId + 1
+ }
+ } else {
+ // We must perform merge-sort; get an iterator by partition and write
everything directly.
+ for ((id, elements) <- this.partitionedIterator) {
+ val blockId = ShuffleBlockId(shuffleId, mapId, id)
+ var partitionWriter: ShufflePartitionWriter = null
+ var partitionPairsWriter: ShufflePartitionPairsWriter = null
+ var threwException = true
+ try {
+ partitionWriter = mapOutputWriter.getPartitionWriter(id)
+ partitionPairsWriter = new ShufflePartitionPairsWriter(
+ partitionWriter,
+ serializerManager,
+ serInstance,
+ blockId,
+ context.taskMetrics().shuffleWriteMetrics)
+ if (elements.hasNext) {
+ for (elem <- elements) {
+ partitionPairsWriter.write(elem._1, elem._2)
+ }
+ }
+ var threwException = false
Review comment:
Shadow variable. But I wonder if `tryWithSafeFinally` isn't better here (and
in the
"mirror" block above for the no-spill case).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]