Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40276213
  
    I actually meant something like this:
    (This is from an internal WIP branch to tackle the ByteBuffer to 
Seq[ByteBuffer])
    Ideally I should submit this via a PR, but unfortunately, but ...
    
    ```
    package org.apache.spark.io
    
    import java.io.OutputStream
    import java.nio.ByteBuffer
    import java.util.{Arrays => JArrays}
    import org.apache.spark.storage.DiskStore
    
    /**
     * A custom implementation of ByteArrayOutputStream which tries to minimize 
array copies by
     * reusing the underlying array if within bounds.
     *
     * Note, this is unsafe for general use : directly exposed the data array 
for use.
     *
     */
    private[io] class SparkByteArrayOutputStream(initialSize: Int) extends 
OutputStream {
    
      if (initialSize < 0) {
        throw new IllegalArgumentException("Negative initial size: " + 
initialSize)
      }
    
      /**
       * The buffer where data is stored.
       */
      private var buf: Array[Byte] = new Array[Byte](initialSize)
      /**
       * The number of valid bytes in the buffer.
       */
      private var count: Int = 0
    
      /**
       * Creates a new byte array output stream. The buffer capacity is
       * initially 32 bytes, though its size increases if necessary.
       */
      def this() = this(32)
    
      private def ensureCapacity(minCapacity: Int) {
        if (minCapacity < 0) throw new IllegalArgumentException("require 
capacity " + minCapacity + " is negative")
        if (minCapacity > buf.length) grow(minCapacity)
      }
    
      /**
       * Increases the capacity to ensure that it can hold at least the
       * number of elements specified by the minimum capacity argument.
       *
       * @param minCapacity the desired minimum capacity
       */
      private def grow(minCapacity: Int) {
        val oldCapacity: Int = buf.length
    
        // TODO: This might be too expensive as size grows : work out something 
better ?
        var newCapacity: Int = oldCapacity << 1
    
        if (newCapacity - minCapacity < 0) newCapacity = minCapacity
        // set max size to DiskStore.MAX_BLOCK_SIZE
        if (newCapacity > DiskStore.MAX_BLOCK_SIZE) newCapacity = 
DiskStore.MAX_BLOCK_SIZE
    
        if (newCapacity < 0) {
          throw new IllegalArgumentException("computed cacacity = " + 
newCapacity +
            " is negative. minCapacity = " + minCapacity)
        }
    
        if (newCapacity <= buf.length || newCapacity < minCapacity) {
          throw new IllegalStateException("Cant grow the array anymore. Already 
at max size ?" +
            " newCapacity = " + newCapacity +
            ", minCapacity = " + minCapacity +
            ", blocksize = " + DiskStore.MAX_BLOCK_SIZE)
        }
    
        buf = JArrays.copyOf(buf, newCapacity)
      }
    
      /**
       * Writes the specified byte to this byte array output stream.
       *
       * @param   b   the byte to be written.
       */
      def write(b: Int) {
        ensureCapacity(count + 1)
        buf(count) = b.asInstanceOf[Byte]
        count += 1
      }
    
      /**
       * Writes <code>len</code> bytes from the specified byte array
       * starting at offset <code>off</code> to this byte array output stream.
       *
       * @param   b     the data.
       * @param   off   the start offset in the data.
       * @param   len   the number of bytes to write.
       */
      override def write(b: Array[Byte], off: Int, len: Int) {
        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - 
b.length > 0)) {
          throw new IndexOutOfBoundsException
        }
        ensureCapacity(count + len)
        System.arraycopy(b, off, buf, count, len)
        count += len
      }
    
      /**
       * Resets the <code>count</code> field of this byte array output
       * stream to zero, so that all currently accumulated output in the
       * output stream is discarded. The output stream can be used again,
       * reusing the already allocated buffer space.
       *
       * @see     java.io.ByteArrayInputStream#count
       */
      def reset() {
        count = 0
      }
    
      /**
       * Trim the underlying array : do this only if it makes sense - that is, 
if the space saving
       * is worth more than the cost of doing the allocation copy.
       *
       * Note, this is called after all data has been written to the stream to 
compact the array.
       */
      def compact() {
        if (SparkByteArrayOutputStream.needCompact(this)) {
          buf = JArrays.copyOf(buf, count)
        }
      }
    
      /**
       * Creates a newly allocated byte array. Its size is the current
       * size of this output stream and the valid contents of the buffer
       * have been copied into it.
       *
       * @return  the current contents of this output stream, as a byte array.
       * @see     java.io.ByteArrayOutputStream#size()
       */
      def toByteBuffer: ByteBuffer = {
        if (0 == count) return ByteBuffer.allocate(0)
        ByteBuffer.wrap(buf, 0, count)
      }
    
      /**
       * Returns the current size of the buffer.
       *
       * @return  the value of the <code>count</code> field, which is the number
       *          of valid bytes in this output stream.
       * @see     java.io.ByteArrayOutputStream#count
       */
      def size: Int = {
        count
      }
    
      /**
       * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
       * this class can be called after the stream has been closed without
       * generating an <tt>IOException</tt>.
       * <p>
       *
       */
      override def close() {
        // set a flag and not allow any more writes ?
      }
    }
    
    
    object SparkByteArrayOutputStream {
      private val MINIMUM_BUFFER_SIZE = 
System.getProperty("spark.io.baos.trim.min_buffer_size", (1024 * 
1024).toString).toInt
      // 0.1 % by default.
      private val WASTAGE_FRACTION = 
System.getProperty("spark.io.baos.trim.wastage", 0.001.toString).toDouble
    
      def needCompact(stream: SparkByteArrayOutputStream): Boolean = {
        val capacity = stream.buf.length
        val used = stream.size
        val wastage = capacity - used
    
        // If no wastage, nothing to compact
        if (wastage <= 0) return false
    
        // If capacity really low, always allow compaction : since cost of 
compaction will be low.
        if (capacity < MINIMUM_BUFFER_SIZE) return true
    
        // If wastage is small enough, then dont compact
        // Currently, set to X % of capacity.
        val allowedWastage = capacity * WASTAGE_FRACTION
    
        wastage >= allowedWastage
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to