GitHub user suyanNone opened a pull request:
https://github.com/apache/spark/pull/6586
[SPARK-8044][CORE] Avoid to use directbuffer while reading or writing disk
level block
1. I found if we use getChannel to put or get data, it will create
DirectBuffer anyway, which is not controllable.
according openJDK source code: because it will create a ThreadLocal
directBuffer pool, and is not provider a 100% percent way to sure the direct
buffer to be released.
```
sun.nio.ch.FileChannelImpl.java
public int write(ByteBuffer src) throws IOException {
210 ensureOpen();
211 if (!writable)
212 throw new NonWritableChannelException();
213 synchronized (positionLock) {
214 int n = 0;
215 int ti = -1;
216 try {
217 begin();
218 if (!isOpen())
219 return 0;
220 ti = threads.add();
221 if (appending)
222 position(size());
223 do {
224 n = IOUtil.write(fd, src, -1, nd, positionLock);
225 } while ((n == IOStatus.INTERRUPTED) && isOpen());
226 return IOStatus.normalize(n);
227 } finally {
228 threads.remove(ti);
229 end(n > 0);
230 assert IOStatus.check(n);
231 }
232 }
233 }
```
```
IOUtil.java
static int write(FileDescriptor fd, ByteBuffer src, long position,
74 NativeDispatcher nd, Object lock)
75 throws IOException
76 {
77 if (src instanceof DirectBuffer)
78 return writeFromNativeBuffer(fd, src, position, nd, lock);
79
80 // Substitute a native buffer
81 int pos = src.position();
82 int lim = src.limit();
83 assert (pos <= lim);
84 int rem = (pos <= lim ? lim - pos : 0);
85 ByteBuffer bb = null;
86 try {
87 bb = Util.getTemporaryDirectBuffer(rem);
88 bb.put(src);
89 bb.flip();
90 // Do not update src until we see how many bytes were written
91 src.position(pos);
92
93 int n = writeFromNativeBuffer(fd, bb, position, nd, lock);
94 if (n > 0) {
95 // now update src
96 src.position(pos + n);
97 }
98 return n;
99 } finally {
100 Util.releaseTemporaryDirectBuffer(bb);
101 }
102 }
```
```
Util.java
static ByteBuffer getTemporaryDirectBuffer(int size) {
61 ByteBuffer buf = null;
62 // Grab a buffer if available
63 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
64 SoftReference ref = (SoftReference)(bufferPool[i].get());
65 if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null)
&&
66 (buf.capacity() >= size)) {
67 buf.rewind();
68 buf.limit(size);
69 bufferPool[i].set(null);
70 return buf;
71 }
72 }
73
74 // Make a new one
75 return ByteBuffer.allocateDirect(size);
76 }
```
```
private static final int TEMP_BUF_POOL_SIZE = 3;
50
51 // Per-thread soft cache of the last temporary direct buffer
52 private static ThreadLocal[] bufferPool;
53
54 static {
55 bufferPool = new ThreadLocal[TEMP_BUF_POOL_SIZE];
56 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++)
57 bufferPool[i] = new ThreadLocal();
58 }
59
60 static ByteBuffer getTemporaryDirectBuffer(int size) {
61 ByteBuffer buf = null;
62 // Grab a buffer if available
63 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
64 SoftReference ref = (SoftReference)(bufferPool[i].get());
65 if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null)
&&
66 (buf.capacity() >= size)) {
67 buf.rewind();
68 buf.limit(size);
69 bufferPool[i].set(null);
70 return buf;
71 }
72 }
73
74 // Make a new one
75 return ByteBuffer.allocateDirect(size);
76 }
77
78 static void releaseTemporaryDirectBuffer(ByteBuffer buf) {
79 if (buf == null)
80 return;
81 // Put it in an empty slot if such exists
82 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
83 SoftReference ref = (SoftReference)(bufferPool[i].get());
84 if ((ref == null) || (ref.get() == null)) {
85 bufferPool[i].set(new SoftReference(buf));
86 return;
87 }
88 }
89 // Otherwise replace a smaller one in the cache if such exists
90 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
91 SoftReference ref = (SoftReference)(bufferPool[i].get());
92 ByteBuffer inCacheBuf = (ByteBuffer)ref.get();
93 if ((inCacheBuf == null) || (buf.capacity() >
inCacheBuf.capacity())) {
94 bufferPool[i].set(new SoftReference(buf));
95 return;
96 }
97 }
98 }
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/suyanNone/spark directBuffer
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/6586.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6586
----
commit ed432573d30ef535fca84d1436a034d6a06dfaff
Author: hushan[è¡ç] <[email protected]>
Date: 2015-06-02T13:47:24Z
[SPARK][CORE] Avoid to use directbuffer while reading or writing disk_level
block
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]