Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/21440#discussion_r203273344
--- Diff:
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks:
Array[ByteBuffer]) {
}
+object ChunkedByteBuffer {
+ // TODO eliminate this method if we switch BlockManager to getting
InputStreams
+ def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int):
ChunkedByteBuffer = {
+ data match {
+ case f: FileSegmentManagedBuffer =>
+ map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+ case other =>
+ new ChunkedByteBuffer(other.nioByteBuffer())
+ }
+ }
+
+ def map(file: File, maxChunkSize: Int, offset: Long, length: Long):
ChunkedByteBuffer = {
+ Utils.tryWithResource(new FileInputStream(file).getChannel()) {
channel =>
+ var remaining = length
+ var pos = offset
+ val chunks = new ListBuffer[ByteBuffer]()
+ while (remaining > 0) {
+ val chunkSize = math.min(remaining, maxChunkSize)
+ val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos,
chunkSize)
--- End diff --
Perfect, thanks for clarifying !
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]