Repository: hbase Updated Branches: refs/heads/master 91a7bbd58 -> 9a94dc90b
HBASE-16642 Use DelayQueue instead of TimeoutBlockingQueue Signed-off-by: Matteo Bertozzi <matteo.berto...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a94dc90 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a94dc90 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a94dc90 Branch: refs/heads/master Commit: 9a94dc90b4a53ca9d82c662f19c0369d3cd9aecf Parents: 91a7bbd Author: Hiroshi Ikeda <ik...@vic.co.jp> Authored: Thu Oct 13 20:21:46 2016 -0700 Committer: Matteo Bertozzi <matteo.berto...@cloudera.com> Committed: Thu Oct 13 21:41:54 2016 -0700 ---------------------------------------------------------------------- .../hbase/procedure2/ProcedureExecutor.java | 93 +++++--- .../procedure2/util/TimeoutBlockingQueue.java | 234 ------------------- .../util/TestTimeoutBlockingQueue.java | 159 ------------- 3 files changed, 67 insertions(+), 419 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9a94dc90/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 2e9e3a3..14fe71b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -29,12 +29,15 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -48,8 +51,6 @@ import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue; -import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -96,17 +97,58 @@ public class ProcedureExecutor<TEnvironment> { } /** - * Used by the TimeoutBlockingQueue to get the timeout interval of the procedure + * Used by the DelayQueue to get the timeout interval of the procedure */ - private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> { + private static class DelayedContainer implements Delayed { + static final DelayedContainer POISON = new DelayedContainer(); + + /** null if poison */ + final Procedure proc; + final long timeoutTime; + + DelayedContainer(Procedure proc) { + assert proc != null; + this.proc = proc; + this.timeoutTime = proc.getLastUpdate() + proc.getTimeout(); + } + + DelayedContainer() { + this.proc = null; + this.timeoutTime = Long.MIN_VALUE; + } + + @Override + public long getDelay(TimeUnit unit) { + long currentTime = EnvironmentEdgeManager.currentTime(); + if (currentTime >= timeoutTime) { + return 0; + } + return unit.convert(timeoutTime - currentTime, TimeUnit.MICROSECONDS); + } + + /** + * @throws NullPointerException {@inheritDoc} + * @throws ClassCastException {@inheritDoc} + */ + @Override + public int compareTo(Delayed o) { + return Long.compare(timeoutTime, ((DelayedContainer)o).timeoutTime); + } + @Override - public long getTimeout(Procedure proc) { - return proc.getTimeRemaining(); + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (! (obj instanceof DelayedContainer)) { + return false; + } + return Objects.equals(proc, ((DelayedContainer)obj).proc); } @Override - public TimeUnit getTimeUnit(Procedure proc) { - return TimeUnit.MILLISECONDS; + public int hashCode() { + return proc != null ? proc.hashCode() : 0; } } @@ -239,8 +281,8 @@ public class ProcedureExecutor<TEnvironment> { * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state * or periodic procedures. */ - private final TimeoutBlockingQueue<Procedure> waitingTimeout = - new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever()); + private final DelayQueue<DelayedContainer> waitingTimeout = + new DelayQueue<DelayedContainer>(); /** * Scheduler/Queue that contains runnable procedures. @@ -544,7 +586,7 @@ public class ProcedureExecutor<TEnvironment> { LOG.info("Stopping the procedure executor"); scheduler.stop(); - waitingTimeout.signalAll(); + waitingTimeout.add(DelayedContainer.POISON); } public void join() { @@ -628,7 +670,7 @@ public class ProcedureExecutor<TEnvironment> { */ public void addChore(final ProcedureInMemoryChore chore) { chore.setState(ProcedureState.RUNNABLE); - waitingTimeout.add(chore); + waitingTimeout.add(new DelayedContainer(chore)); } /** @@ -638,7 +680,7 @@ public class ProcedureExecutor<TEnvironment> { */ public boolean removeChore(final ProcedureInMemoryChore chore) { chore.setState(ProcedureState.FINISHED); - return waitingTimeout.remove(chore); + return waitingTimeout.remove(new DelayedContainer(chore)); } /** @@ -927,15 +969,16 @@ public class ProcedureExecutor<TEnvironment> { private void timeoutLoop() { while (isRunning()) { - Procedure proc = waitingTimeout.poll(); - if (proc == null) continue; - - if (proc.getTimeRemaining() > 100) { - // got an early wake, maybe a stop? - // re-enqueue the task in case was not a stop or just a signal - waitingTimeout.add(proc); + Procedure proc; + try { + proc = waitingTimeout.take().proc; + } catch (InterruptedException e) { + // Just consume the interruption. continue; } + if (proc == null) { // POISON to stop + break; + } // ---------------------------------------------------------------------------- // TODO-MAYBE: Should we provide a notification to the store with the @@ -955,8 +998,8 @@ public class ProcedureExecutor<TEnvironment> { } catch (Throwable e) { LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e); } - proc.setStartTime(EnvironmentEdgeManager.currentTime()); - if (proc.isRunnable()) waitingTimeout.add(proc); + proc.updateTimestamp(); + if (proc.isRunnable()) waitingTimeout.add(new DelayedContainer(proc)); } continue; } @@ -970,8 +1013,6 @@ public class ProcedureExecutor<TEnvironment> { store.update(proc); scheduler.addFront(proc); continue; - } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) { - waitingTimeout.add(proc); } } } @@ -1171,7 +1212,7 @@ public class ProcedureExecutor<TEnvironment> { procedure.setState(ProcedureState.WAITING); break; case WAITING_TIMEOUT: - waitingTimeout.add(procedure); + waitingTimeout.add(new DelayedContainer(procedure)); break; default: break; @@ -1179,7 +1220,7 @@ public class ProcedureExecutor<TEnvironment> { } } } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { - waitingTimeout.add(procedure); + waitingTimeout.add(new DelayedContainer(procedure)); } else if (!isSuspended) { // No subtask, so we are done procedure.setState(ProcedureState.FINISHED); http://git-wip-us.apache.org/repos/asf/hbase/blob/9a94dc90/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java deleted file mode 100644 index 2292e63..0000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2.util; - -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class TimeoutBlockingQueue<E> { - public static interface TimeoutRetriever<T> { - long getTimeout(T object); - TimeUnit getTimeUnit(T object); - } - - private final ReentrantLock lock = new ReentrantLock(); - private final Condition waitCond = lock.newCondition(); - private final TimeoutRetriever<? super E> timeoutRetriever; - - private E[] objects; - private int head = 0; - private int tail = 0; - - public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) { - this(32, timeoutRetriever); - } - - @SuppressWarnings("unchecked") - public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever) { - this.objects = (E[])new Object[capacity]; - this.timeoutRetriever = timeoutRetriever; - } - - public void dump() { - for (int i = 0; i < objects.length; ++i) { - if (i == head) { - System.out.print("[" + objects[i] + "] "); - } else if (i == tail) { - System.out.print("]" + objects[i] + "[ "); - } else { - System.out.print(objects[i] + " "); - } - } - System.out.println(); - } - - public void clear() { - lock.lock(); - try { - if (head != tail) { - for (int i = head; i < tail; ++i) { - objects[i] = null; - } - head = 0; - tail = 0; - waitCond.signal(); - } - } finally { - lock.unlock(); - } - } - - public void add(E e) { - if (e == null) throw new NullPointerException(); - - lock.lock(); - try { - addElement(e); - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - public boolean remove(E e) { - if (e == null) return false; - lock.lock(); - try { - for (int i = 0; i < objects.length; ++i) { - if (e.equals(objects[i])) { - objects[i] = null; - return true; - } - } - return false; - } finally { - lock.unlock(); - } - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public E poll() { - lock.lock(); - try { - if (isEmpty()) { - waitCond.await(); - return null; - } - - E elem = objects[head]; - long nanos = getNanosTimeout(elem); - nanos = waitCond.awaitNanos(nanos); - return nanos > 0 ? null : removeFirst(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } finally { - lock.unlock(); - } - } - - public int size() { - return tail - head; - } - - public boolean isEmpty() { - return (tail - head) == 0; - } - - public void signalAll() { - lock.lock(); - try { - waitCond.signalAll(); - } finally { - lock.unlock(); - } - } - - private void addElement(E elem) { - int size = (tail - head); - if ((objects.length - size) == 0) { - int capacity = size + ((size < 64) ? (size + 2) : (size >> 1)); - E[] newObjects = (E[])new Object[capacity]; - - if (compareTimeouts(objects[tail - 1], elem) <= 0) { - // Append - System.arraycopy(objects, head, newObjects, 0, tail); - tail -= head; - newObjects[tail++] = elem; - } else if (compareTimeouts(objects[head], elem) > 0) { - // Prepend - System.arraycopy(objects, head, newObjects, 1, tail); - newObjects[0] = elem; - tail -= (head - 1); - } else { - // Insert in the middle - int index = upperBound(head, tail - 1, elem); - int newIndex = (index - head); - System.arraycopy(objects, head, newObjects, 0, newIndex); - newObjects[newIndex] = elem; - System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index); - tail -= (head - 1); - } - head = 0; - objects = newObjects; - } else { - if (tail == objects.length) { - // shift down |-----AAAAAAA| - tail -= head; - System.arraycopy(objects, head, objects, 0, tail); - head = 0; - } - - if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) { - // Append - objects[tail++] = elem; - } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) { - // Prepend - objects[--head] = elem; - } else { - // Insert in the middle - int index = upperBound(head, tail - 1, elem); - System.arraycopy(objects, index, objects, index + 1, tail - index); - objects[index] = elem; - tail++; - } - } - } - - private E removeFirst() { - E elem = objects[head]; - objects[head] = null; - head = (head + 1) % objects.length; - if (head == 0) tail = 0; - return elem; - } - - private int upperBound(int start, int end, E key) { - while (start < end) { - int mid = (start + end) >>> 1; - E mitem = objects[mid]; - int cmp = compareTimeouts(mitem, key); - if (cmp > 0) { - end = mid; - } else { - start = mid + 1; - } - } - return start; - } - - private int compareTimeouts(final E a, final E b) { - long t1 = getNanosTimeout(a); - long t2 = getNanosTimeout(b); - return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0; - } - - private long getNanosTimeout(final E obj) { - if (obj == null) return 0; - TimeUnit unit = timeoutRetriever.getTimeUnit(obj); - long timeout = timeoutRetriever.getTimeout(obj); - return unit.toNanos(timeout); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/9a94dc90/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java deleted file mode 100644 index 1f901b5..0000000 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2.util; - - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.hbase.CategoryBasedTimeout; -import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever; -import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestRule; - -@Category({MasterTests.class, MediumTests.class}) -public class TestTimeoutBlockingQueue { - @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()). - withLookingForStuckThread(true).build(); - static class TestObject { - private long timeout; - private int seqId; - - public TestObject(int seqId, long timeout) { - this.timeout = timeout; - this.seqId = seqId; - } - - public long getTimeout() { - return timeout; - } - - public String toString() { - return String.format("(%03d, %03d)", seqId, timeout); - } - } - - static class TestObjectTimeoutRetriever implements TimeoutRetriever<TestObject> { - @Override - public long getTimeout(TestObject obj) { - return obj.getTimeout(); - } - - @Override - public TimeUnit getTimeUnit(TestObject obj) { - return TimeUnit.MILLISECONDS; - } - } - - @Test - public void testOrder() { - TimeoutBlockingQueue<TestObject> queue = - new TimeoutBlockingQueue<TestObject>(8, new TestObjectTimeoutRetriever()); - - long[] timeouts = new long[] {500, 200, 700, 300, 600, 600, 200, 800, 500}; - - for (int i = 0; i < timeouts.length; ++i) { - for (int j = 0; j <= i; ++j) { - queue.add(new TestObject(j, timeouts[j])); - queue.dump(); - } - - long prev = 0; - for (int j = 0; j <= i; ++j) { - TestObject obj = queue.poll(); - assertTrue(obj.getTimeout() >= prev); - prev = obj.getTimeout(); - queue.dump(); - } - } - } - - @Test - public void testTimeoutBlockingQueue() { - TimeoutBlockingQueue<TestObject> queue; - - int[][] testArray = new int[][] { - {200, 400, 600}, // append - {200, 400, 100}, // prepend - {200, 400, 300}, // insert - }; - - for (int i = 0; i < testArray.length; ++i) { - int[] sortedArray = Arrays.copyOf(testArray[i], testArray[i].length); - Arrays.sort(sortedArray); - - // test with head == 0 - queue = new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever()); - for (int j = 0; j < testArray[i].length; ++j) { - queue.add(new TestObject(j, testArray[i][j])); - queue.dump(); - } - - for (int j = 0; !queue.isEmpty(); ++j) { - assertEquals(sortedArray[j], queue.poll().getTimeout()); - } - - queue = new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever()); - queue.add(new TestObject(0, 50)); - assertEquals(50, queue.poll().getTimeout()); - - // test with head > 0 - for (int j = 0; j < testArray[i].length; ++j) { - queue.add(new TestObject(j, testArray[i][j])); - queue.dump(); - } - - for (int j = 0; !queue.isEmpty(); ++j) { - assertEquals(sortedArray[j], queue.poll().getTimeout()); - } - } - } - - @Test - public void testRemove() { - TimeoutBlockingQueue<TestObject> queue = - new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever()); - - final int effectiveLen = 5; - TestObject[] objs = new TestObject[6]; - for (int i = 0; i < effectiveLen; ++i) { - objs[i] = new TestObject(0, i * 10); - queue.add(objs[i]); - } - objs[effectiveLen] = new TestObject(0, effectiveLen * 10); - queue.dump(); - - for (int i = 0; i < effectiveLen; i += 2) { - assertTrue(queue.remove(objs[i])); - } - assertTrue(!queue.remove(objs[effectiveLen])); - - for (int i = 0; i < effectiveLen; ++i) { - TestObject x = queue.poll(); - assertEquals((i % 2) == 0 ? null : objs[i], x); - } - } -}