MAPREDUCE-5997. native-task: Use DirectBufferPool from Hadoop Common. 
Contributed by Todd Lipcon.

git-svn-id: 
https://svn.apache.org/repos/asf/hadoop/common/branches/MR-2841@1613034 
13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77acc70d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77acc70d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77acc70d

Branch: refs/heads/HDFS-6584
Commit: 77acc70df5d64055ea809222f3d2f0d66c611196
Parents: 5149a8a
Author: Todd Lipcon <t...@apache.org>
Authored: Thu Jul 24 08:20:25 2014 +0000
Committer: Todd Lipcon <t...@apache.org>
Committed: Thu Jul 24 08:20:25 2014 +0000

----------------------------------------------------------------------
 .../CHANGES.MAPREDUCE-2841.txt                  |   1 +
 .../mapred/nativetask/NativeBatchProcessor.java |   8 +-
 .../nativetask/buffer/DirectBufferPool.java     |  93 ---------
 .../mapred/nativetask/buffer/InputBuffer.java   |  17 +-
 .../nativetask/buffer/TestDirectBufferPool.java | 201 -------------------
 5 files changed, 20 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/77acc70d/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt 
b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
index cea5a76..e12f743 100644
--- a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
+++ b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
@@ -4,3 +4,4 @@ Changes for Hadoop Native Map Output Collector
 MAPREDUCE-5985. native-task: Fix build on macosx. Contributed by Binglin Chang
 MAPREDUCE-5994. Simplify ByteUtils and fix failing test. (todd)
 MAPREDUCE-5996. native-task: Rename system tests into standard directory 
layout (todd)
+MAPREDUCE-5997. native-task: Use DirectBufferPool from Hadoop Common (todd)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77acc70d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
index fd68ea6..837da0e 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
@@ -24,12 +24,13 @@ import java.nio.ByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
-import org.apache.hadoop.mapred.nativetask.buffer.DirectBufferPool;
 import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
 import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
 import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
 import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
+import org.apache.hadoop.util.DirectBufferPool;
 
 /**
  * used to create channel, transfer data and command between Java and native
@@ -126,9 +127,8 @@ public class NativeBatchProcessor implements INativeHandler 
{
       NativeRuntime.releaseNativeObject(nativeHandlerAddr);
       nativeHandlerAddr = 0;
     }
-    if (null != in && null != in.getByteBuffer() && 
in.getByteBuffer().isDirect()) {
-      DirectBufferPool.getInstance().returnBuffer(in.getByteBuffer());
-    }
+    IOUtils.cleanup(LOG, in);
+    in = null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77acc70d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java
deleted file mode 100644
index bd3c6bb..0000000
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred.nativetask.buffer;
-
-import java.io.IOException;
-import java.lang.ref.WeakReference;
-import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-
-/**
- * as direct buffer memory is not collected by GC, we keep a pool
- * to reuse direct buffers
- */
-public class DirectBufferPool {
-  
-  private static DirectBufferPool directBufferPool = null;
-  private static Log LOG = LogFactory.getLog(DirectBufferPool.class);
-  private ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> bufferMap = 
new ConcurrentHashMap<Integer, Queue<WeakReference<ByteBuffer>>>();
-
-  private DirectBufferPool() {
-  }
-  
-  public static synchronized DirectBufferPool getInstance() {
-    if (null == directBufferPool) {
-      directBufferPool = new DirectBufferPool();
-    }
-    return directBufferPool;
-  }
-  
-  public static void destoryInstance(){
-    directBufferPool = null;
-  }
-  
-  public synchronized ByteBuffer borrowBuffer(int capacity) throws IOException 
{
-    Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity);
-    if (null == list) {
-      return ByteBuffer.allocateDirect(capacity);
-    }
-    WeakReference<ByteBuffer> ref;
-    while ((ref = list.poll()) != null) {
-      ByteBuffer buf = ref.get();
-      if (buf != null) {
-        return buf;
-      }
-    }
-    return ByteBuffer.allocateDirect(capacity);
-  }
-  
-  public void returnBuffer(ByteBuffer buffer) throws IOException {
-    if (null == buffer || !buffer.isDirect()) {
-      throw new IOException("the buffer is null or the buffer returned is not 
direct buffer");
-    }
-
-    buffer.clear();
-    int capacity = buffer.capacity();
-    Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity);
-    if (null == list) {
-      list = new ConcurrentLinkedQueue<WeakReference<ByteBuffer>>();
-      Queue<WeakReference<ByteBuffer>> prev = bufferMap.putIfAbsent(capacity, 
list);
-      if (prev != null) {
-        list = prev;
-      }
-    }
-    list.add(new WeakReference<ByteBuffer>(buffer));
-  }
-
-  int getBufCountsForCapacity(int capacity) {
-    return bufferMap.get(capacity).size();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77acc70d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java
index eb15164..82193b8 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.mapred.nativetask.buffer;
 
+import org.apache.hadoop.util.DirectBufferPool;
+
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-public class InputBuffer {
+public class InputBuffer implements Closeable {
+
+  static DirectBufferPool bufferPool = new DirectBufferPool();
 
   private ByteBuffer byteBuffer;
   private final BufferType type;
@@ -36,7 +41,7 @@ public class InputBuffer {
 
       switch (type) {
       case DIRECT_BUFFER:
-        this.byteBuffer = 
DirectBufferPool.getInstance().borrowBuffer(capacity);
+        this.byteBuffer = bufferPool.getBuffer(capacity);
         this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
         break;
       case HEAP_BUFFER:
@@ -118,4 +123,12 @@ public class InputBuffer {
     }
     return byteBuffer.array();
   }
+
+  @Override
+  public void close() {
+    if (byteBuffer != null && byteBuffer.isDirect()) {
+      bufferPool.returnBuffer(byteBuffer);
+      byteBuffer = null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77acc70d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java
deleted file mode 100644
index 09c1ef5..0000000
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred.nativetask.buffer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.ArrayList;
-
-import org.junit.Test;
-
-public class TestDirectBufferPool {
-
-  @Test
-  public void testGetInstance() throws Exception {
-    final int num = 100;
-    List<DirectBufferPool> pools = new ArrayList<DirectBufferPool>();
-    Thread[] list = new Thread[num];
-    for (int i = 0; i < num; i++)  {
-      Thread t = getPoolThread(pools);
-      t.start();
-      list[i] = t;
-    }
-    for (int i = 0; i < num; i++) {
-      try {
-        list[i].join(10000);
-      } catch (Exception e) {
-        e.printStackTrace(); 
-      }
-    }
-    DirectBufferPool p1 = pools.get(0);
-    assertNotNull(p1);
-    for (int i = 1; i < pools.size(); i++) {
-      DirectBufferPool p2 = pools.get(i);
-      assertNotNull(p2);
-      assertSame(p1, p2);
-    }
-  }
-
-  private Thread getPoolThread(final List<DirectBufferPool> pools) {
-    Thread t = new Thread() {
-      public void run() {
-        pools.add(DirectBufferPool.getInstance());
-      }
-    };
-    return t;
-  }
-
-
-  @Test
-  public void testBufBorrow() throws IOException {
-    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
-    ByteBuffer b1 = bufferPool.borrowBuffer(100);
-    assertTrue(b1.isDirect());
-    assertEquals(0, b1.position());
-    assertEquals(100, b1.capacity());
-    bufferPool.returnBuffer(b1);
-    ByteBuffer b2 = bufferPool.borrowBuffer(100);
-    assertTrue(b2.isDirect());
-    assertEquals(0, b2.position());
-    assertEquals(100, b2.capacity());
-    assertSame(b1, b2);
-
-    ByteBuffer b3 =  bufferPool.borrowBuffer(100);
-    assertTrue(b3.isDirect());
-    assertEquals(0, b3.position());
-    assertEquals(100, b3.capacity());
-    assertNotSame(b2, b3);
-    bufferPool.returnBuffer(b2);
-    bufferPool.returnBuffer(b3);
-  }
-
-  @Test
-  public void testBufReset() throws IOException {
-    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
-    ByteBuffer b1 = bufferPool.borrowBuffer(100);
-    assertTrue(b1.isDirect());
-    assertEquals(0, b1.position());
-    assertEquals(100, b1.capacity());
-    b1.putInt(1);
-    assertEquals(4, b1.position());
-    bufferPool.returnBuffer(b1);
-    ByteBuffer b2 = bufferPool.borrowBuffer(100);
-    assertSame(b1, b2);
-    assertTrue(b2.isDirect());
-    assertEquals(0, b2.position());
-    assertEquals(100, b2.capacity());
-  }
-
-  @Test
-  public void testBufReturn() throws IOException {
-    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
-    int numOfBufs = 100;
-    int capacity = 100;
-    final ByteBuffer[] bufs = new ByteBuffer[numOfBufs];
-    for (int i = 0; i < numOfBufs; i++) {
-      bufs[i] = bufferPool.borrowBuffer(capacity);
-    }
-
-    assertEquals(0, bufferPool.getBufCountsForCapacity(capacity));
-
-
-    int numOfThreads = numOfBufs;
-    Thread[] list = new Thread[numOfThreads];
-    for (int i = 0; i < numOfThreads; i++) {
-      Thread t = retBufThread(bufferPool, bufs, i);
-      t.start();
-      list[i] = t;
-    }
-    for (int i = 0; i < numOfThreads; i++) {
-      try {
-        list[i].join(10000);
-      } catch (Exception e) {
-       e.printStackTrace();
-      }
-    }
-
-    assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity));
-  }
-
-  private Thread retBufThread(final DirectBufferPool bufferPool, final 
ByteBuffer[] bufs, final int i) {
-       Thread t = new Thread(new Runnable(){
-        @Override
-        public void run() {
-          try {
-          bufferPool.returnBuffer(bufs[i]);
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        }
-      });
-    return t;
-  }
-
-  @Test
-  public void testBufException() {
-    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
-    boolean thrown = false;
-    try {
-      bufferPool.returnBuffer(null);
-    } catch (IOException e) {
-      thrown = true;
-    }
-    assertEquals(true, thrown);
-
-    thrown = false;
-    ByteBuffer buf = ByteBuffer.allocate(100);
-    try {
-      bufferPool.returnBuffer(buf);
-    } catch (IOException e) {
-      thrown = true;
-    }
-    assertEquals(true, thrown);
-  }
-
-  @Test
-  public void testBufWeakRefClear() throws IOException {
-    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
-    int numOfBufs = 100;
-    int capacity = 100;
-    ByteBuffer[] list = new ByteBuffer[capacity];
-    for (int i = 0; i < numOfBufs; i++) {
-      list[i] = bufferPool.borrowBuffer(capacity);
-    }
-    for (int i = 0; i < numOfBufs; i++) {
-      bufferPool.returnBuffer(list[i]);
-      list[i] = null;
-    }
-
-    assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity));
-
-    for (int i = 0; i < 3; i++) {
-      System.gc();
-    }
-
-    ByteBuffer b = bufferPool.borrowBuffer(capacity);
-    assertEquals(0, bufferPool.getBufCountsForCapacity(capacity));
-  }
-}

Reply via email to