Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7130#discussion_r33594631
  
    --- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
    @@ -603,8 +539,165 @@ private[spark] class ExternalSorter[K, V, C](
       }
     
       /**
    -   * Return an iterator over all the data written to this object, grouped 
by partition and
    -   * aggregated by the requested aggregator. For each partition we then 
have an iterator over its
    +   * spill contents of in-memory iterator to a temporary file on disk.
    +   */
    +  private def spillMemoryToDisk(inMemory: WritablePartitionedIterator): 
SpilledFile = {
    +    // Because these files may be read during shuffle, their compression 
must be controlled by
    +    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so 
we need to use
    +    // createTempShuffleBlock here; see SPARK-3426 for more context.
    +    val (blockId, file) = diskBlockManager.createTempShuffleBlock()
    +
    +    // These variables are reset after each flush
    +    var objectsWritten: Long = 0
    +    var spillMetrics: ShuffleWriteMetrics = null
    +    var writer: BlockObjectWriter = null
    +    def openWriter(): Unit = {
    +      assert (writer == null && spillMetrics == null)
    +      spillMetrics = new ShuffleWriteMetrics
    +      writer = blockManager.getDiskWriter(blockId, file, serInstance, 
fileBufferSize, spillMetrics)
    +    }
    +    openWriter()
    +
    +    // List of batch sizes (bytes) in the order they are written to disk
    +    val batchSizes = new ArrayBuffer[Long]
    +
    +    // How many elements we have in each partition
    +    val elementsPerPartition = new Array[Long](numPartitions)
    +
    +    // Flush the disk writer's contents to disk, and update relevant 
variables.
    +    // The writer is closed at the end of this process, and cannot be 
reused.
    +    def flush(): Unit = {
    +      val w = writer
    +      writer = null
    +      w.commitAndClose()
    +      _diskBytesSpilled += spillMetrics.shuffleBytesWritten
    +      batchSizes.append(spillMetrics.shuffleBytesWritten)
    +      spillMetrics = null
    +      objectsWritten = 0
    +    }
    +
    +    var success = false
    +    try {
    +
    +      while (inMemory.hasNext) {
    +        val partitionId = inMemory.nextPartition()
    +        inMemory.writeNext(writer)
    +        elementsPerPartition(partitionId) += 1
    +        objectsWritten += 1
    +
    +        if (objectsWritten == serializerBatchSize) {
    +          flush()
    +          openWriter()
    +        }
    +      }
    +      if (objectsWritten > 0) {
    +        flush()
    +      } else if (writer != null) {
    +        val w = writer
    +        writer = null
    +        w.revertPartialWritesAndClose()
    +      }
    +      success = true
    +    } finally {
    +      if (!success) {
    +        // This code path only happens if an exception was thrown above 
before we set success;
    +        // close our stuff and let the exception be thrown further
    +        if (writer != null) {
    +          writer.revertPartialWritesAndClose()
    +        }
    +        if (file.exists()) {
    +          file.delete()
    +        }
    +      }
    +    }
    +    SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
    +  }
    +
    +  /**
    +   * Spill in-memory inMemory to a temporary file on disk.
    +   * Return on-disk iterator over a temporary file.
    +   */
    +  private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]): 
Iterator[((Int, K), C)] = {
    +    val it = new WritablePartitionedIterator {
    +      private[this] var cur = if (iterator.hasNext) iterator.next() else 
null
    +
    +      def writeNext(writer: BlockObjectWriter): Unit = {
    +        writer.write(cur._1._2, cur._2)
    +        cur = if (iterator.hasNext) iterator.next() else null
    +      }
    +
    +      def hasNext(): Boolean = cur != null
    +
    +      def nextPartition(): Int = cur._1._1
    +    }
    +
    +    val spillReader = new SpillReader(spillMemoryToDisk(it))
    --- End diff --
    
    is there a potential leak here?  It seems like we just get an iterator out 
of the file, and so we have no way to delete the file.  We need to clean up 
both if the iterator is read fully, or if the iterator is only partially read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to