HADOOP-12189. Improve CallQueueManager#swapQueue to make queue elements drop nearly impossible. Contributed by Zhihai Xu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6736a1ab Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6736a1ab Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6736a1ab Branch: refs/heads/HDFS-7240 Commit: 6736a1ab7033523ed5f304fdfed46d7f348665b4 Parents: 813cf89 Author: Andrew Wang <w...@apache.org> Authored: Thu Jul 23 14:42:35 2015 -0700 Committer: Andrew Wang <w...@apache.org> Committed: Thu Jul 23 14:42:35 2015 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++ .../org/apache/hadoop/ipc/CallQueueManager.java | 27 +++++++++++++------- .../apache/hadoop/ipc/TestCallQueueManager.java | 6 ++--- 3 files changed, 24 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6736a1ab/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index f1a3bc9..6c18add 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -719,6 +719,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12161. Add getStoragePolicy API to the FileSystem interface. (Brahma Reddy Battula via Arpit Agarwal) + HADOOP-12189. Improve CallQueueManager#swapQueue to make queue elements + drop nearly impossible. (Zhihai Xu via wang) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp http://git-wip-us.apache.org/repos/asf/hadoop/blob/6736a1ab/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 1568bd6..c10f839 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -32,11 +32,15 @@ import org.apache.hadoop.conf.Configuration; */ public class CallQueueManager<E> { public static final Log LOG = LogFactory.getLog(CallQueueManager.class); + // Number of checkpoints for empty queue. + private static final int CHECKPOINT_NUM = 20; + // Interval to check empty queue. + private static final long CHECKPOINT_INTERVAL_MS = 10; @SuppressWarnings("unchecked") static <E> Class<? extends BlockingQueue<E>> convertQueueClass( - Class<?> queneClass, Class<E> elementClass) { - return (Class<? extends BlockingQueue<E>>)queneClass; + Class<?> queueClass, Class<E> elementClass) { + return (Class<? extends BlockingQueue<E>>)queueClass; } private final boolean clientBackOffEnabled; @@ -159,18 +163,23 @@ public class CallQueueManager<E> { } /** - * Checks if queue is empty by checking at two points in time. + * Checks if queue is empty by checking at CHECKPOINT_NUM points with + * CHECKPOINT_INTERVAL_MS interval. * This doesn't mean the queue might not fill up at some point later, but * it should decrease the probability that we lose a call this way. */ private boolean queueIsReallyEmpty(BlockingQueue<?> q) { - boolean wasEmpty = q.isEmpty(); - try { - Thread.sleep(10); - } catch (InterruptedException ie) { - return false; + for (int i = 0; i < CHECKPOINT_NUM; i++) { + try { + Thread.sleep(CHECKPOINT_INTERVAL_MS); + } catch (InterruptedException ie) { + return false; + } + if (!q.isEmpty()) { + return false; + } } - return q.isEmpty() && wasEmpty; + return true; } private String stringRepr(Object o) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6736a1ab/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index 6e1838e..51a9750 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -165,7 +165,7 @@ public class TestCallQueueManager { HashMap<Runnable, Thread> threads = new HashMap<Runnable, Thread>(); // Create putters and takers - for (int i=0; i < 50; i++) { + for (int i=0; i < 1000; i++) { Putter p = new Putter(manager, -1, -1); Thread pt = new Thread(p); producers.add(p); @@ -174,7 +174,7 @@ public class TestCallQueueManager { pt.start(); } - for (int i=0; i < 20; i++) { + for (int i=0; i < 100; i++) { Taker t = new Taker(manager, -1, -1); Thread tt = new Thread(t); consumers.add(t); @@ -183,7 +183,7 @@ public class TestCallQueueManager { tt.start(); } - Thread.sleep(10); + Thread.sleep(500); for (int i=0; i < 5; i++) { manager.swapQueue(queueClass, 5000, "", null);