squito commented on a change in pull request #25007:
[SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API
URL: https://github.com/apache/spark/pull/25007#discussion_r305508027
##########
File path:
core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
##########
@@ -142,82 +109,54 @@ class LocalDiskShuffleMapOutputWriterSuite extends
SparkFunSuite with BeforeAndA
intercept[IllegalStateException] {
stream.write(p)
}
- assert(writer.getNumBytesWritten === D_LEN)
+ assert(writer.getNumBytesWritten === data(p).length)
}
- mapOutputWriter.commitAllPartitions()
- val partitionLengths = (0 until NUM_PARTITIONS).map { _ =>
D_LEN.toDouble}.toArray
- assert(partitionSizesInMergedFile === partitionLengths)
- assert(mergedOutputFile.length() === partitionLengths.sum)
- assert(data === readRecordsFromFile(false))
+ verifyWrittenRecords()
}
test("writing to a channel") {
(0 until NUM_PARTITIONS).foreach { p =>
val writer = mapOutputWriter.getPartitionWriter(p)
- val channel = writer.openTransferrableChannel()
- val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
- val intBuffer = byteBuffer.asIntBuffer()
- intBuffer.put(data(p))
- val numBytes = byteBuffer.remaining()
val outputTempFile = File.createTempFile("channelTemp", "", tempDir)
val outputTempFileStream = new FileOutputStream(outputTempFile)
- Utils.copyStream(
- new ByteBufferInputStream(byteBuffer),
- outputTempFileStream,
- closeStreams = true)
+ outputTempFileStream.write(data(p))
+ outputTempFileStream.close()
val tempFileInput = new FileInputStream(outputTempFile)
- channel.transferFrom(tempFileInput.getChannel, 0L, numBytes)
- // Bytes require * 4
- channel.close()
- tempFileInput.close()
- assert(writer.getNumBytesWritten === D_LEN * 4)
+ val channel = writer.openTransferrableChannel()
+ Utils.tryWithResource(new FileInputStream(outputTempFile)) {
tempFileInput =>
+ Utils.tryWithResource(writer.openTransferrableChannel()) { channel =>
+ channel.transferFrom(tempFileInput.getChannel, 0L, data(p).length)
+ }
+ }
+ assert(writer.getNumBytesWritten === data(p).length)
}
- mapOutputWriter.commitAllPartitions()
- val partitionLengths = (0 until NUM_PARTITIONS).map { _ => (D_LEN *
4).toDouble }.toArray
- assert(partitionSizesInMergedFile === partitionLengths)
- assert(mergedOutputFile.length() === partitionLengths.sum)
- assert(data === readRecordsFromFile(true))
+ verifyWrittenRecords()
}
- test("copyStreams with an outputstream") {
+ private def readRecordsFromFile() = {
+ var startOffset = 0L
+ val result = new Array[Array[Byte]](NUM_PARTITIONS)
(0 until NUM_PARTITIONS).foreach { p =>
- val writer = mapOutputWriter.getPartitionWriter(p)
- val stream = writer.openStream()
- val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
- val intBuffer = byteBuffer.asIntBuffer()
- intBuffer.put(data(p))
- val in = new ByteArrayInputStream(byteBuffer.array())
- Utils.copyStream(in, stream, false, false)
- in.close()
- stream.close()
- assert(writer.getNumBytesWritten === D_LEN * 4)
+ val partitionSize = data(p).length
+ if (partitionSize > 0) {
+ val in = new FileInputStream(mergedOutputFile)
+ in.getChannel.position(startOffset)
+ val lin = new LimitedInputStream(in, partitionSize)
Review comment:
these input streams aren't getting closed in a `finally`.
I still think its easier to just open the file once. Since you're reading
bytes instead of ints now, you could do
```scala
Utils.tryWithResource(new FileInputStream(mergedOutputFile))) { in =>
(0 until NUM_PARTITIONS).foreach { p =>
val buffer = new Array[Byte](data(p).length)
org.apache.commons.io.IOUtils.readFully(in, buffer)
result(p) = buffer
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]