Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22325#discussion_r218872037
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
    @@ -444,36 +444,34 @@ final class ShuffleBlockFetcherIterator(
                   throwFetchFailedException(blockId, address, e)
               }
     
    -          input = streamWrapper(blockId, in)
    -          // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
    -          // block is small (the decompressed block is smaller than 
maxBytesInFlight)
    -          if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 
3) {
    -            val originalInput = input
    -            val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
    -            try {
    +          try {
    +            input = streamWrapper(blockId, in)
    +            // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
    +            // block is small (the decompressed block is smaller than 
maxBytesInFlight)
    +            if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight 
/ 3) {
    +              val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
                   // Decompress the whole block at once to detect any 
corruption, which could increase
                   // the memory usage tne potential increase the chance of OOM.
                   // TODO: manage the memory used here, and spill it into disk 
in case of OOM.
                   Utils.copyStream(input, out)
                   out.close()
                   input = out.toChunkedByteBuffer.toInputStream(dispose = true)
    --- End diff --
    
    @jiangxb1987 Thanks for the comment. Was that the purpose of 
"originalInput" val in the code before my change? That was closing in finally 
part though not before create an new input


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to