GitHub user suyanNone opened a pull request:

    https://github.com/apache/spark/pull/6586

    [SPARK-8044][CORE] Avoid to use directbuffer while reading or writing disk 
level block

    1.  I found if we use getChannel to put or get data, it will create 
DirectBuffer anyway, which is not controllable.
    
    according openJDK source code: because it will create a ThreadLocal 
directBuffer pool, and is not provider a 100% percent way to sure the direct 
buffer to be released.
    
    ```
     sun.nio.ch.FileChannelImpl.java
    public int write(ByteBuffer src) throws IOException {
    210         ensureOpen();
    211         if (!writable)
    212             throw new NonWritableChannelException();
    213         synchronized (positionLock) {
    214             int n = 0;
    215             int ti = -1;
    216             try {
    217                 begin();
    218                 if (!isOpen())
    219                     return 0;
    220                 ti = threads.add();
    221                 if (appending)
    222                     position(size());
    223                 do {
    224                     n = IOUtil.write(fd, src, -1, nd, positionLock);
    225                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
    226                 return IOStatus.normalize(n);
    227             } finally {
    228                 threads.remove(ti);
    229                 end(n > 0);
    230                 assert IOStatus.check(n);
    231             }
    232         }
    233     }
    ```
    ```
    IOUtil.java
    
    static int write(FileDescriptor fd, ByteBuffer src, long position,
    74                      NativeDispatcher nd, Object lock)
    75         throws IOException
    76     {
    77         if (src instanceof DirectBuffer)
    78             return writeFromNativeBuffer(fd, src, position, nd, lock);
    79 
    80         // Substitute a native buffer
    81         int pos = src.position();
    82         int lim = src.limit();
    83         assert (pos <= lim);
    84         int rem = (pos <= lim ? lim - pos : 0);
    85         ByteBuffer bb = null;
    86         try {
    87             bb = Util.getTemporaryDirectBuffer(rem);
    88             bb.put(src);
    89             bb.flip();
    90             // Do not update src until we see how many bytes were written
    91             src.position(pos);
    92 
    93             int n = writeFromNativeBuffer(fd, bb, position, nd, lock);
    94             if (n > 0) {
    95                 // now update src
    96                 src.position(pos + n);
    97             }
    98             return n;
    99         } finally {
    100            Util.releaseTemporaryDirectBuffer(bb);
    101        }
    102    }
    ```
    ```
    Util.java
         static ByteBuffer getTemporaryDirectBuffer(int size) {
    61         ByteBuffer buf = null;
    62         // Grab a buffer if available
    63         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
    64             SoftReference ref = (SoftReference)(bufferPool[i].get());
    65             if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) 
&&
    66                 (buf.capacity() >= size)) {
    67                 buf.rewind();
    68                 buf.limit(size);
    69                 bufferPool[i].set(null);
    70                 return buf;
    71             }
    72         }
    73 
    74         // Make a new one
    75         return ByteBuffer.allocateDirect(size);
    76     }
    ```
    ```
     private static final int TEMP_BUF_POOL_SIZE = 3;
    50 
    51     // Per-thread soft cache of the last temporary direct buffer
    52     private static ThreadLocal[] bufferPool;
    53 
    54     static {
    55         bufferPool = new ThreadLocal[TEMP_BUF_POOL_SIZE];
    56         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++)
    57             bufferPool[i] = new ThreadLocal();
    58     }
    59 
    60     static ByteBuffer getTemporaryDirectBuffer(int size) {
    61         ByteBuffer buf = null;
    62         // Grab a buffer if available
    63         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
    64             SoftReference ref = (SoftReference)(bufferPool[i].get());
    65             if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) 
&&
    66                 (buf.capacity() >= size)) {
    67                 buf.rewind();
    68                 buf.limit(size);
    69                 bufferPool[i].set(null);
    70                 return buf;
    71             }
    72         }
    73 
    74         // Make a new one
    75         return ByteBuffer.allocateDirect(size);
    76     }
    77 
    78     static void releaseTemporaryDirectBuffer(ByteBuffer buf) {
    79         if (buf == null)
    80             return;
    81         // Put it in an empty slot if such exists
    82         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
    83             SoftReference ref = (SoftReference)(bufferPool[i].get());
    84             if ((ref == null) || (ref.get() == null)) {
    85                 bufferPool[i].set(new SoftReference(buf));
    86                 return;
    87             }
    88         }
    89         // Otherwise replace a smaller one in the cache if such exists
    90         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
    91             SoftReference ref = (SoftReference)(bufferPool[i].get());
    92             ByteBuffer inCacheBuf = (ByteBuffer)ref.get();
    93             if ((inCacheBuf == null) || (buf.capacity() > 
inCacheBuf.capacity())) {
    94                 bufferPool[i].set(new SoftReference(buf));
    95                 return;
    96             }
    97         }
    98     }
    
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/suyanNone/spark directBuffer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/6586.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6586
    
----
commit ed432573d30ef535fca84d1436a034d6a06dfaff
Author: hushan[胡珊] <[email protected]>
Date:   2015-06-02T13:47:24Z

    [SPARK][CORE] Avoid to use directbuffer while reading or writing disk_level 
block

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to