Repository: hbase
Updated Branches:
  refs/heads/master 91a7bbd58 -> 9a94dc90b


HBASE-16642 Use DelayQueue instead of TimeoutBlockingQueue

Signed-off-by: Matteo Bertozzi <matteo.berto...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a94dc90
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a94dc90
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a94dc90

Branch: refs/heads/master
Commit: 9a94dc90b4a53ca9d82c662f19c0369d3cd9aecf
Parents: 91a7bbd
Author: Hiroshi Ikeda <ik...@vic.co.jp>
Authored: Thu Oct 13 20:21:46 2016 -0700
Committer: Matteo Bertozzi <matteo.berto...@cloudera.com>
Committed: Thu Oct 13 21:41:54 2016 -0700

----------------------------------------------------------------------
 .../hbase/procedure2/ProcedureExecutor.java     |  93 +++++---
 .../procedure2/util/TimeoutBlockingQueue.java   | 234 -------------------
 .../util/TestTimeoutBlockingQueue.java          | 159 -------------
 3 files changed, 67 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9a94dc90/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 2e9e3a3..14fe71b 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -29,12 +29,15 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -48,8 +51,6 @@ import 
org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import 
org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
-import 
org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -96,17 +97,58 @@ public class ProcedureExecutor<TEnvironment> {
   }
 
   /**
-   * Used by the TimeoutBlockingQueue to get the timeout interval of the 
procedure
+   * Used by the DelayQueue to get the timeout interval of the procedure
    */
-  private static class ProcedureTimeoutRetriever implements 
TimeoutRetriever<Procedure> {
+  private static class DelayedContainer implements Delayed {
+    static final DelayedContainer POISON = new DelayedContainer();
+
+    /** null if poison */
+    final Procedure proc;
+    final long timeoutTime;
+
+    DelayedContainer(Procedure proc) {
+      assert proc != null;
+      this.proc = proc;
+      this.timeoutTime = proc.getLastUpdate() + proc.getTimeout();
+    }
+
+    DelayedContainer() {
+      this.proc = null;
+      this.timeoutTime = Long.MIN_VALUE;
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      long currentTime = EnvironmentEdgeManager.currentTime();
+      if (currentTime >= timeoutTime) {
+        return 0;
+      }
+      return unit.convert(timeoutTime - currentTime, TimeUnit.MICROSECONDS);
+    }
+
+    /**
+     * @throws NullPointerException {@inheritDoc}
+     * @throws ClassCastException {@inheritDoc}
+     */
+    @Override
+    public int compareTo(Delayed o) {
+      return Long.compare(timeoutTime, ((DelayedContainer)o).timeoutTime);
+    }
+
     @Override
-    public long getTimeout(Procedure proc) {
-      return proc.getTimeRemaining();
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+      if (! (obj instanceof DelayedContainer)) {
+        return false;
+      }
+      return Objects.equals(proc, ((DelayedContainer)obj).proc);
     }
 
     @Override
-    public TimeUnit getTimeUnit(Procedure proc) {
-      return TimeUnit.MILLISECONDS;
+    public int hashCode() {
+      return proc != null ? proc.hashCode() : 0;
     }
   }
 
@@ -239,8 +281,8 @@ public class ProcedureExecutor<TEnvironment> {
    * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
    * or periodic procedures.
    */
-  private final TimeoutBlockingQueue<Procedure> waitingTimeout =
-    new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
+  private final DelayQueue<DelayedContainer> waitingTimeout =
+    new DelayQueue<DelayedContainer>();
 
   /**
    * Scheduler/Queue that contains runnable procedures.
@@ -544,7 +586,7 @@ public class ProcedureExecutor<TEnvironment> {
 
     LOG.info("Stopping the procedure executor");
     scheduler.stop();
-    waitingTimeout.signalAll();
+    waitingTimeout.add(DelayedContainer.POISON);
   }
 
   public void join() {
@@ -628,7 +670,7 @@ public class ProcedureExecutor<TEnvironment> {
    */
   public void addChore(final ProcedureInMemoryChore chore) {
     chore.setState(ProcedureState.RUNNABLE);
-    waitingTimeout.add(chore);
+    waitingTimeout.add(new DelayedContainer(chore));
   }
 
   /**
@@ -638,7 +680,7 @@ public class ProcedureExecutor<TEnvironment> {
    */
   public boolean removeChore(final ProcedureInMemoryChore chore) {
     chore.setState(ProcedureState.FINISHED);
-    return waitingTimeout.remove(chore);
+    return waitingTimeout.remove(new DelayedContainer(chore));
   }
 
   /**
@@ -927,15 +969,16 @@ public class ProcedureExecutor<TEnvironment> {
 
   private void timeoutLoop() {
     while (isRunning()) {
-      Procedure proc = waitingTimeout.poll();
-      if (proc == null) continue;
-
-      if (proc.getTimeRemaining() > 100) {
-        // got an early wake, maybe a stop?
-        // re-enqueue the task in case was not a stop or just a signal
-        waitingTimeout.add(proc);
+      Procedure proc;
+      try {
+        proc = waitingTimeout.take().proc;
+      } catch (InterruptedException e) {
+        // Just consume the interruption.
         continue;
       }
+      if (proc == null) { // POISON to stop
+        break;
+      }
 
       // 
----------------------------------------------------------------------------
       // TODO-MAYBE: Should we provide a notification to the store with the
@@ -955,8 +998,8 @@ public class ProcedureExecutor<TEnvironment> {
           } catch (Throwable e) {
             LOG.error("Ignoring CompletedProcedureCleaner exception: " + 
e.getMessage(), e);
           }
-          proc.setStartTime(EnvironmentEdgeManager.currentTime());
-          if (proc.isRunnable()) waitingTimeout.add(proc);
+          proc.updateTimestamp();
+          if (proc.isRunnable()) waitingTimeout.add(new 
DelayedContainer(proc));
         }
         continue;
       }
@@ -970,8 +1013,6 @@ public class ProcedureExecutor<TEnvironment> {
         store.update(proc);
         scheduler.addFront(proc);
         continue;
-      } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
-        waitingTimeout.add(proc);
       }
     }
   }
@@ -1171,7 +1212,7 @@ public class ProcedureExecutor<TEnvironment> {
                   procedure.setState(ProcedureState.WAITING);
                   break;
                 case WAITING_TIMEOUT:
-                  waitingTimeout.add(procedure);
+                  waitingTimeout.add(new DelayedContainer(procedure));
                   break;
                 default:
                   break;
@@ -1179,7 +1220,7 @@ public class ProcedureExecutor<TEnvironment> {
             }
           }
         } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
-          waitingTimeout.add(procedure);
+          waitingTimeout.add(new DelayedContainer(procedure));
         } else if (!isSuspended) {
           // No subtask, so we are done
           procedure.setState(ProcedureState.FINISHED);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a94dc90/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
deleted file mode 100644
index 2292e63..0000000
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2.util;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class TimeoutBlockingQueue<E> {
-  public static interface TimeoutRetriever<T> {
-    long getTimeout(T object);
-    TimeUnit getTimeUnit(T object);
-  }
-
-  private final ReentrantLock lock = new ReentrantLock();
-  private final Condition waitCond = lock.newCondition();
-  private final TimeoutRetriever<? super E> timeoutRetriever;
-
-  private E[] objects;
-  private int head = 0;
-  private int tail = 0;
-
-  public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) {
-    this(32, timeoutRetriever);
-  }
-
-  @SuppressWarnings("unchecked")
-  public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> 
timeoutRetriever) {
-    this.objects = (E[])new Object[capacity];
-    this.timeoutRetriever = timeoutRetriever;
-  }
-
-  public void dump() {
-    for (int i = 0; i < objects.length; ++i) {
-      if (i == head) {
-        System.out.print("[" + objects[i] + "] ");
-      } else if (i == tail) {
-        System.out.print("]" + objects[i] + "[ ");
-      } else {
-        System.out.print(objects[i] + " ");
-      }
-    }
-    System.out.println();
-  }
-
-  public void clear() {
-    lock.lock();
-    try {
-      if (head != tail) {
-        for (int i = head; i < tail; ++i) {
-          objects[i] = null;
-        }
-        head = 0;
-        tail = 0;
-        waitCond.signal();
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  public void add(E e) {
-    if (e == null) throw new NullPointerException();
-
-    lock.lock();
-    try {
-      addElement(e);
-      waitCond.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  public boolean remove(E e) {
-    if (e == null) return false;
-    lock.lock();
-    try {
-      for (int i = 0; i < objects.length; ++i) {
-        if (e.equals(objects[i])) {
-          objects[i] = null;
-          return true;
-        }
-      }
-      return false;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
-  public E poll() {
-    lock.lock();
-    try {
-      if (isEmpty()) {
-        waitCond.await();
-        return null;
-      }
-
-      E elem = objects[head];
-      long nanos = getNanosTimeout(elem);
-      nanos = waitCond.awaitNanos(nanos);
-      return nanos > 0 ? null : removeFirst();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      return null;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  public int size() {
-    return tail - head;
-  }
-
-  public boolean isEmpty() {
-    return (tail - head) == 0;
-  }
-
-  public void signalAll() {
-    lock.lock();
-    try {
-      waitCond.signalAll();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  private void addElement(E elem) {
-    int size = (tail - head);
-    if ((objects.length - size) == 0) {
-      int capacity = size + ((size < 64) ? (size + 2) : (size >> 1));
-      E[] newObjects = (E[])new Object[capacity];
-
-      if (compareTimeouts(objects[tail - 1], elem) <= 0) {
-        // Append
-        System.arraycopy(objects, head, newObjects, 0, tail);
-        tail -= head;
-        newObjects[tail++] = elem;
-      } else if (compareTimeouts(objects[head], elem) > 0) {
-        // Prepend
-        System.arraycopy(objects, head, newObjects, 1, tail);
-        newObjects[0] = elem;
-        tail -= (head - 1);
-      } else {
-        // Insert in the middle
-        int index = upperBound(head, tail - 1, elem);
-        int newIndex = (index - head);
-        System.arraycopy(objects, head, newObjects, 0, newIndex);
-        newObjects[newIndex] = elem;
-        System.arraycopy(objects, index, newObjects, newIndex + 1, tail - 
index);
-        tail -= (head - 1);
-      }
-      head = 0;
-      objects = newObjects;
-    } else {
-      if (tail == objects.length) {
-        // shift down |-----AAAAAAA|
-        tail -= head;
-        System.arraycopy(objects, head, objects, 0, tail);
-        head = 0;
-      }
-
-      if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) {
-        // Append
-        objects[tail++] = elem;
-      } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) {
-        // Prepend
-        objects[--head] = elem;
-      } else {
-        // Insert in the middle
-        int index = upperBound(head, tail - 1, elem);
-        System.arraycopy(objects, index, objects, index + 1, tail - index);
-        objects[index] = elem;
-        tail++;
-      }
-    }
-  }
-
-  private E removeFirst() {
-    E elem = objects[head];
-    objects[head] = null;
-    head = (head + 1) % objects.length;
-    if (head == 0) tail = 0;
-    return elem;
-  }
-
-  private int upperBound(int start, int end, E key) {
-    while (start < end) {
-      int mid = (start + end) >>> 1;
-      E mitem = objects[mid];
-      int cmp = compareTimeouts(mitem, key);
-      if (cmp > 0) {
-        end = mid;
-      } else {
-        start = mid + 1;
-      }
-    }
-    return start;
-  }
-
-  private int compareTimeouts(final E a, final E b) {
-    long t1 = getNanosTimeout(a);
-    long t2 = getNanosTimeout(b);
-    return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0;
-  }
-
-  private long getNanosTimeout(final E obj) {
-    if (obj == null) return 0;
-    TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
-    long timeout = timeoutRetriever.getTimeout(obj);
-    return unit.toNanos(timeout);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a94dc90/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
deleted file mode 100644
index 1f901b5..0000000
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2.util;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.hbase.CategoryBasedTimeout;
-import 
org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestRule;
-
-@Category({MasterTests.class, MediumTests.class})
-public class TestTimeoutBlockingQueue {
-  @Rule public final TestRule timeout = 
CategoryBasedTimeout.builder().withTimeout(this.getClass()).
-      withLookingForStuckThread(true).build();
-  static class TestObject {
-    private long timeout;
-    private int seqId;
-
-    public TestObject(int seqId, long timeout) {
-      this.timeout = timeout;
-      this.seqId = seqId;
-    }
-
-    public long getTimeout() {
-      return timeout;
-    }
-
-    public String toString() {
-      return String.format("(%03d, %03d)", seqId, timeout);
-    }
-  }
-
-  static class TestObjectTimeoutRetriever implements 
TimeoutRetriever<TestObject> {
-    @Override
-    public long getTimeout(TestObject obj) {
-      return obj.getTimeout();
-    }
-
-    @Override
-    public TimeUnit getTimeUnit(TestObject obj) {
-      return TimeUnit.MILLISECONDS;
-    }
-  }
-
-  @Test
-  public void testOrder() {
-    TimeoutBlockingQueue<TestObject> queue =
-      new TimeoutBlockingQueue<TestObject>(8, new 
TestObjectTimeoutRetriever());
-
-    long[] timeouts = new long[] {500, 200, 700, 300, 600, 600, 200, 800, 500};
-
-    for (int i = 0; i < timeouts.length; ++i) {
-      for (int j = 0; j <= i; ++j) {
-        queue.add(new TestObject(j, timeouts[j]));
-        queue.dump();
-      }
-
-      long prev = 0;
-      for (int j = 0; j <= i; ++j) {
-        TestObject obj = queue.poll();
-        assertTrue(obj.getTimeout() >= prev);
-        prev = obj.getTimeout();
-        queue.dump();
-      }
-    }
-  }
-
-  @Test
-  public void testTimeoutBlockingQueue() {
-    TimeoutBlockingQueue<TestObject> queue;
-
-    int[][] testArray = new int[][] {
-      {200, 400, 600},  // append
-      {200, 400, 100},  // prepend
-      {200, 400, 300},  // insert
-    };
-
-    for (int i = 0; i < testArray.length; ++i) {
-      int[] sortedArray = Arrays.copyOf(testArray[i], testArray[i].length);
-      Arrays.sort(sortedArray);
-
-      // test with head == 0
-      queue = new TimeoutBlockingQueue<TestObject>(2, new 
TestObjectTimeoutRetriever());
-      for (int j = 0; j < testArray[i].length; ++j) {
-        queue.add(new TestObject(j, testArray[i][j]));
-        queue.dump();
-      }
-
-      for (int j = 0; !queue.isEmpty(); ++j) {
-        assertEquals(sortedArray[j], queue.poll().getTimeout());
-      }
-
-      queue = new TimeoutBlockingQueue<TestObject>(2, new 
TestObjectTimeoutRetriever());
-      queue.add(new TestObject(0, 50));
-      assertEquals(50, queue.poll().getTimeout());
-
-      // test with head > 0
-      for (int j = 0; j < testArray[i].length; ++j) {
-        queue.add(new TestObject(j, testArray[i][j]));
-        queue.dump();
-      }
-
-      for (int j = 0; !queue.isEmpty(); ++j) {
-        assertEquals(sortedArray[j], queue.poll().getTimeout());
-      }
-    }
-  }
-
-  @Test
-  public void testRemove() {
-    TimeoutBlockingQueue<TestObject> queue =
-      new TimeoutBlockingQueue<TestObject>(2, new 
TestObjectTimeoutRetriever());
-
-    final int effectiveLen = 5;
-    TestObject[] objs = new TestObject[6];
-    for (int i = 0; i < effectiveLen; ++i) {
-      objs[i] = new TestObject(0, i * 10);
-      queue.add(objs[i]);
-    }
-    objs[effectiveLen] = new TestObject(0, effectiveLen * 10);
-    queue.dump();
-
-    for (int i = 0; i < effectiveLen; i += 2) {
-      assertTrue(queue.remove(objs[i]));
-    }
-    assertTrue(!queue.remove(objs[effectiveLen]));
-
-    for (int i = 0; i < effectiveLen; ++i) {
-      TestObject x = queue.poll();
-      assertEquals((i % 2) == 0 ? null : objs[i], x);
-    }
-  }
-}

Reply via email to