Github user mridulm commented on the pull request:
https://github.com/apache/spark/pull/397#issuecomment-40276213
I actually meant something like this:
(This is from an internal WIP branch to tackle the ByteBuffer to
Seq[ByteBuffer])
Ideally I should submit this via a PR, but unfortunately, but ...
```
package org.apache.spark.io
import java.io.OutputStream
import java.nio.ByteBuffer
import java.util.{Arrays => JArrays}
import org.apache.spark.storage.DiskStore
/**
* A custom implementation of ByteArrayOutputStream which tries to minimize
array copies by
* reusing the underlying array if within bounds.
*
* Note, this is unsafe for general use : directly exposed the data array
for use.
*
*/
private[io] class SparkByteArrayOutputStream(initialSize: Int) extends
OutputStream {
if (initialSize < 0) {
throw new IllegalArgumentException("Negative initial size: " +
initialSize)
}
/**
* The buffer where data is stored.
*/
private var buf: Array[Byte] = new Array[Byte](initialSize)
/**
* The number of valid bytes in the buffer.
*/
private var count: Int = 0
/**
* Creates a new byte array output stream. The buffer capacity is
* initially 32 bytes, though its size increases if necessary.
*/
def this() = this(32)
private def ensureCapacity(minCapacity: Int) {
if (minCapacity < 0) throw new IllegalArgumentException("require
capacity " + minCapacity + " is negative")
if (minCapacity > buf.length) grow(minCapacity)
}
/**
* Increases the capacity to ensure that it can hold at least the
* number of elements specified by the minimum capacity argument.
*
* @param minCapacity the desired minimum capacity
*/
private def grow(minCapacity: Int) {
val oldCapacity: Int = buf.length
// TODO: This might be too expensive as size grows : work out something
better ?
var newCapacity: Int = oldCapacity << 1
if (newCapacity - minCapacity < 0) newCapacity = minCapacity
// set max size to DiskStore.MAX_BLOCK_SIZE
if (newCapacity > DiskStore.MAX_BLOCK_SIZE) newCapacity =
DiskStore.MAX_BLOCK_SIZE
if (newCapacity < 0) {
throw new IllegalArgumentException("computed cacacity = " +
newCapacity +
" is negative. minCapacity = " + minCapacity)
}
if (newCapacity <= buf.length || newCapacity < minCapacity) {
throw new IllegalStateException("Cant grow the array anymore. Already
at max size ?" +
" newCapacity = " + newCapacity +
", minCapacity = " + minCapacity +
", blocksize = " + DiskStore.MAX_BLOCK_SIZE)
}
buf = JArrays.copyOf(buf, newCapacity)
}
/**
* Writes the specified byte to this byte array output stream.
*
* @param b the byte to be written.
*/
def write(b: Int) {
ensureCapacity(count + 1)
buf(count) = b.asInstanceOf[Byte]
count += 1
}
/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> to this byte array output stream.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
*/
override def write(b: Array[Byte], off: Int, len: Int) {
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) -
b.length > 0)) {
throw new IndexOutOfBoundsException
}
ensureCapacity(count + len)
System.arraycopy(b, off, buf, count, len)
count += len
}
/**
* Resets the <code>count</code> field of this byte array output
* stream to zero, so that all currently accumulated output in the
* output stream is discarded. The output stream can be used again,
* reusing the already allocated buffer space.
*
* @see java.io.ByteArrayInputStream#count
*/
def reset() {
count = 0
}
/**
* Trim the underlying array : do this only if it makes sense - that is,
if the space saving
* is worth more than the cost of doing the allocation copy.
*
* Note, this is called after all data has been written to the stream to
compact the array.
*/
def compact() {
if (SparkByteArrayOutputStream.needCompact(this)) {
buf = JArrays.copyOf(buf, count)
}
}
/**
* Creates a newly allocated byte array. Its size is the current
* size of this output stream and the valid contents of the buffer
* have been copied into it.
*
* @return the current contents of this output stream, as a byte array.
* @see java.io.ByteArrayOutputStream#size()
*/
def toByteBuffer: ByteBuffer = {
if (0 == count) return ByteBuffer.allocate(0)
ByteBuffer.wrap(buf, 0, count)
}
/**
* Returns the current size of the buffer.
*
* @return the value of the <code>count</code> field, which is the number
* of valid bytes in this output stream.
* @see java.io.ByteArrayOutputStream#count
*/
def size: Int = {
count
}
/**
* Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
* this class can be called after the stream has been closed without
* generating an <tt>IOException</tt>.
* <p>
*
*/
override def close() {
// set a flag and not allow any more writes ?
}
}
object SparkByteArrayOutputStream {
private val MINIMUM_BUFFER_SIZE =
System.getProperty("spark.io.baos.trim.min_buffer_size", (1024 *
1024).toString).toInt
// 0.1 % by default.
private val WASTAGE_FRACTION =
System.getProperty("spark.io.baos.trim.wastage", 0.001.toString).toDouble
def needCompact(stream: SparkByteArrayOutputStream): Boolean = {
val capacity = stream.buf.length
val used = stream.size
val wastage = capacity - used
// If no wastage, nothing to compact
if (wastage <= 0) return false
// If capacity really low, always allow compaction : since cost of
compaction will be low.
if (capacity < MINIMUM_BUFFER_SIZE) return true
// If wastage is small enough, then dont compact
// Currently, set to X % of capacity.
val allowedWastage = capacity * WASTAGE_FRACTION
wastage >= allowedWastage
}
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---