squito commented on a change in pull request #23453: [SPARK-26089][CORE] Handle 
corruption in large shuffle blocks
URL: https://github.com/apache/spark/pull/23453#discussion_r252026605
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
 ##########
 @@ -211,6 +212,51 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
     assert(os.toByteArray.toList.equals(bytes.toList))
   }
 
+  test("copyStreamUpTo") {
+    // input array initialization
+    val bytes = Array.ofDim[Byte](1200)
 
 Review comment:
   my concerns here are that (a) at the bounday, we make sure we capture the 
last byte of memory section & first byte of following stream (and don't 
duplicate anything) and (b) that the memory gets freed as soon as we've read 
past the end (by closing the ChunkedByteBufferInputStream).  I guess (b) should 
really be covered by SequenceInputStream, but as its outside of our control, a 
confirmation here is nice.  And (a) has to do with how you setup the 
SequenceInputStream.  I would like a test targeted at those concerns, something 
like:
   
   
   ```scala
   val nBytes = 10
   val bytes = Array[Byte](nBytes)
   (0 until nBytes).foreach { idx => bytes(idx) = idx }
   (0 until nBytes + 10).foreach { limit =>
     val originalInput = ByteArrayInputStream(bytes)
     val (fullyCopied, mergedStream) = Utils.copyStreamUpTo(originalInput, 
limit, true)
     val byteBufferInputStream = // reflection hacks to get handle on the 
bytebufferinput stream
     assert (fullyCopied == (limit >= nBytes))
     (0 until nBytes).foreach { readIdx =>
       val readByte = mergedStream.read()
       assert(readByte == readIdx)
       if (readIdx == limit) {
         // make sure we null out the reference to the buffered portion as soon 
as possible, to free memory
         assert(byteBufferInputStream.chunkedByteBuffer == null)
       }
     }
     assert(mergedStream.read() == -1)
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to