Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22325#discussion_r218857081
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
    @@ -444,36 +444,34 @@ final class ShuffleBlockFetcherIterator(
                   throwFetchFailedException(blockId, address, e)
               }
     
    -          input = streamWrapper(blockId, in)
    -          // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
    -          // block is small (the decompressed block is smaller than 
maxBytesInFlight)
    -          if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 
3) {
    -            val originalInput = input
    -            val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
    -            try {
    +          try {
    +            input = streamWrapper(blockId, in)
    +            // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
    +            // block is small (the decompressed block is smaller than 
maxBytesInFlight)
    +            if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight 
/ 3) {
    +              val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
                   // Decompress the whole block at once to detect any 
corruption, which could increase
                   // the memory usage tne potential increase the chance of OOM.
                   // TODO: manage the memory used here, and spill it into disk 
in case of OOM.
                   Utils.copyStream(input, out)
                   out.close()
                   input = out.toChunkedByteBuffer.toInputStream(dispose = true)
    --- End diff --
    
    We create a new `input` here, so the original input shall be closed to 
avoid memory leak.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to