Github user lianhuiwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/7130#discussion_r33641851
--- Diff:
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -603,8 +539,165 @@ private[spark] class ExternalSorter[K, V, C](
}
/**
- * Return an iterator over all the data written to this object, grouped
by partition and
- * aggregated by the requested aggregator. For each partition we then
have an iterator over its
+ * spill contents of in-memory iterator to a temporary file on disk.
+ */
+ private def spillMemoryToDisk(inMemory: WritablePartitionedIterator):
SpilledFile = {
+ // Because these files may be read during shuffle, their compression
must be controlled by
+ // spark.shuffle.compress instead of spark.shuffle.spill.compress, so
we need to use
+ // createTempShuffleBlock here; see SPARK-3426 for more context.
+ val (blockId, file) = diskBlockManager.createTempShuffleBlock()
+
+ // These variables are reset after each flush
+ var objectsWritten: Long = 0
+ var spillMetrics: ShuffleWriteMetrics = null
+ var writer: BlockObjectWriter = null
+ def openWriter(): Unit = {
+ assert (writer == null && spillMetrics == null)
+ spillMetrics = new ShuffleWriteMetrics
+ writer = blockManager.getDiskWriter(blockId, file, serInstance,
fileBufferSize, spillMetrics)
+ }
+ openWriter()
+
+ // List of batch sizes (bytes) in the order they are written to disk
+ val batchSizes = new ArrayBuffer[Long]
+
+ // How many elements we have in each partition
+ val elementsPerPartition = new Array[Long](numPartitions)
+
+ // Flush the disk writer's contents to disk, and update relevant
variables.
+ // The writer is closed at the end of this process, and cannot be
reused.
+ def flush(): Unit = {
+ val w = writer
+ writer = null
+ w.commitAndClose()
+ _diskBytesSpilled += spillMetrics.shuffleBytesWritten
+ batchSizes.append(spillMetrics.shuffleBytesWritten)
+ spillMetrics = null
+ objectsWritten = 0
+ }
+
+ var success = false
+ try {
+
+ while (inMemory.hasNext) {
+ val partitionId = inMemory.nextPartition()
+ inMemory.writeNext(writer)
+ elementsPerPartition(partitionId) += 1
+ objectsWritten += 1
+
+ if (objectsWritten == serializerBatchSize) {
+ flush()
+ openWriter()
+ }
+ }
+ if (objectsWritten > 0) {
+ flush()
+ } else if (writer != null) {
+ val w = writer
+ writer = null
+ w.revertPartialWritesAndClose()
+ }
+ success = true
+ } finally {
+ if (!success) {
+ // This code path only happens if an exception was thrown above
before we set success;
+ // close our stuff and let the exception be thrown further
+ if (writer != null) {
+ writer.revertPartialWritesAndClose()
+ }
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+ }
+ SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
+ }
+
+ /**
+ * Spill in-memory inMemory to a temporary file on disk.
+ * Return on-disk iterator over a temporary file.
+ */
+ private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]):
Iterator[((Int, K), C)] = {
+ val it = new WritablePartitionedIterator {
+ private[this] var cur = if (iterator.hasNext) iterator.next() else
null
+
+ def writeNext(writer: BlockObjectWriter): Unit = {
+ writer.write(cur._1._2, cur._2)
+ cur = if (iterator.hasNext) iterator.next() else null
+ }
+
+ def hasNext(): Boolean = cur != null
+
+ def nextPartition(): Int = cur._1._1
+ }
+
+ val spillReader = new SpillReader(spillMemoryToDisk(it))
--- End diff --
yes, thanks. because as before add spillReader to spills and in stop()
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala#L693
it will delete files of spills. but there is left problem when ExternalSorter
is used as common sort , not shuffle sort because common sort do not call
stop().
maybe add deleting file to cleanup()
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala#L594,
that is like ExternalAppendOnlyMap.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]