HBASE-16813 Procedure v2 - Move ProcedureEvent to hbase-procedure module
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/92ef2344 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/92ef2344 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/92ef2344 Branch: refs/heads/master Commit: 92ef234486537b4325641ce47f6fde26d9432710 Parents: dfb2a80 Author: Matteo Bertozzi <matteo.berto...@cloudera.com> Authored: Wed Oct 12 16:33:25 2016 -0700 Committer: Matteo Bertozzi <matteo.berto...@cloudera.com> Committed: Wed Oct 12 16:33:25 2016 -0700 ---------------------------------------------------------------------- .../procedure2/AbstractProcedureScheduler.java | 311 ++++++++++ .../hadoop/hbase/procedure2/ProcedureEvent.java | 57 ++ .../hbase/procedure2/ProcedureEventQueue.java | 85 +++ .../hbase/procedure2/ProcedureExecutor.java | 63 +- .../hbase/procedure2/ProcedureRunnableSet.java | 81 --- .../hbase/procedure2/ProcedureScheduler.java | 137 +++++ .../procedure2/ProcedureSimpleRunQueue.java | 121 ---- .../procedure2/SimpleProcedureScheduler.java | 71 +++ .../procedure2/ProcedureTestingUtility.java | 38 +- .../hbase/procedure2/TestProcedureEvents.java | 132 +++++ .../TestProcedureSchedulerConcurrency.java | 160 ++++++ .../procedure2/TestProcedureSuspended.java | 6 +- .../hbase/procedure2/TestYieldProcedures.java | 59 +- .../org/apache/hadoop/hbase/master/HMaster.java | 2 +- .../master/procedure/MasterProcedureEnv.java | 7 +- .../procedure/MasterProcedureScheduler.java | 569 +++---------------- .../procedure/TestMasterProcedureEvents.java | 92 +-- .../procedure/TestMasterProcedureScheduler.java | 36 +- ...TestMasterProcedureSchedulerConcurrency.java | 93 +-- 19 files changed, 1145 insertions(+), 975 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java new file mode 100644 index 0000000..c4ae877 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class AbstractProcedureScheduler implements ProcedureScheduler { + private static final Log LOG = LogFactory.getLog(AbstractProcedureScheduler.class); + + private final ReentrantLock schedLock = new ReentrantLock(); + private final Condition schedWaitCond = schedLock.newCondition(); + private boolean running = false; + + // TODO: metrics + private long pollCalls = 0; + private long nullPollCalls = 0; + + @Override + public void start() { + schedLock(); + try { + running = true; + } finally { + schedUnlock(); + } + } + + @Override + public void stop() { + schedLock(); + try { + running = false; + schedWaitCond.signalAll(); + } finally { + schedUnlock(); + } + } + + @Override + public void signalAll() { + schedLock(); + try { + schedWaitCond.signalAll(); + } finally { + schedUnlock(); + } + } + + // ========================================================================== + // Add related + // ========================================================================== + /** + * Add the procedure to the queue. + * NOTE: this method is called with the sched lock held. + * @param procedure the Procedure to add + * @param addFront true if the item should be added to the front of the queue + */ + protected abstract void enqueue(Procedure procedure, boolean addFront); + + public void addFront(final Procedure procedure) { + push(procedure, true, true); + } + + public void addBack(final Procedure procedure) { + push(procedure, false, true); + } + + protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { + schedLock.lock(); + try { + enqueue(procedure, addFront); + if (notify) { + schedWaitCond.signal(); + } + } finally { + schedLock.unlock(); + } + } + + // ========================================================================== + // Poll related + // ========================================================================== + /** + * Fetch one Procedure from the queue + * NOTE: this method is called with the sched lock held. + * @return the Procedure to execute, or null if nothing is available. + */ + protected abstract Procedure dequeue(); + + @Override + public Procedure poll() { + return poll(-1); + } + + @Override + public Procedure poll(long timeout, TimeUnit unit) { + return poll(unit.toNanos(timeout)); + } + + public Procedure poll(long nanos) { + final boolean waitForever = (nanos < 0); + schedLock(); + try { + while (!queueHasRunnables()) { + if (!running) return null; + if (waitForever) { + schedWaitCond.await(); + } else { + if (nanos <= 0) return null; + nanos = schedWaitCond.awaitNanos(nanos); + } + } + + final Procedure pollResult = dequeue(); + pollCalls++; + nullPollCalls += (pollResult == null) ? 1 : 0; + return pollResult; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + nullPollCalls++; + return null; + } finally { + schedUnlock(); + } + } + + // ========================================================================== + // Utils + // ========================================================================== + /** + * Removes all of the elements from the queue + * NOTE: this method is called with the sched lock held. + */ + protected abstract void clearQueue(); + + /** + * Returns the number of elements in this queue. + * NOTE: this method is called with the sched lock held. + * @return the number of elements in this queue. + */ + protected abstract int queueSize(); + + /** + * Returns true if there are procedures available to process. + * NOTE: this method is called with the sched lock held. + * @return true if there are procedures available to process, otherwise false. + */ + protected abstract boolean queueHasRunnables(); + + @Override + public void clear() { + // NOTE: USED ONLY FOR TESTING + schedLock(); + try { + clearQueue(); + } finally { + schedUnlock(); + } + } + + @Override + public int size() { + schedLock(); + try { + return queueSize(); + } finally { + schedUnlock(); + } + } + + @Override + public boolean hasRunnables() { + schedLock(); + try { + return queueHasRunnables(); + } finally { + schedUnlock(); + } + } + + // ============================================================================ + // TODO: Metrics + // ============================================================================ + public long getPollCalls() { + return pollCalls; + } + + public long getNullPollCalls() { + return nullPollCalls; + } + + // ========================================================================== + // Procedure Events + // ========================================================================== + @Override + public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) { + synchronized (event) { + if (event.isReady()) { + return false; + } + suspendProcedure(event, procedure); + return true; + } + } + + @Override + public void suspendEvent(final ProcedureEvent event) { + final boolean isTraceEnabled = LOG.isTraceEnabled(); + synchronized (event) { + event.setReady(false); + if (isTraceEnabled) { + LOG.trace("Suspend event " + event); + } + } + } + + @Override + public void wakeEvent(final ProcedureEvent event) { + wakeEvents(1, event); + } + + @Override + public void wakeEvents(final int count, final ProcedureEvent... events) { + final boolean isTraceEnabled = LOG.isTraceEnabled(); + schedLock(); + try { + int waitingCount = 0; + for (int i = 0; i < count; ++i) { + final ProcedureEvent event = events[i]; + synchronized (event) { + event.setReady(true); + if (isTraceEnabled) { + LOG.trace("Wake event " + event); + } + waitingCount += popEventWaitingObjects(event); + } + } + wakePollIfNeeded(waitingCount); + } finally { + schedUnlock(); + } + } + + protected int popEventWaitingObjects(final ProcedureEvent event) { + return popEventWaitingProcedures(event); + } + + protected int popEventWaitingProcedures(final ProcedureEventQueue event) { + int count = 0; + while (event.hasWaitingProcedures()) { + wakeProcedure(event.popWaitingProcedure(false)); + count++; + } + return count; + } + + protected void suspendProcedure(final ProcedureEventQueue event, final Procedure procedure) { + procedure.suspend(); + event.suspendProcedure(procedure); + } + + protected void wakeProcedure(final Procedure procedure) { + procedure.resume(); + push(procedure, /* addFront= */ true, /* notify= */false); + } + + // ========================================================================== + // Internal helpers + // ========================================================================== + protected void schedLock() { + schedLock.lock(); + } + + protected void schedUnlock() { + schedLock.unlock(); + } + + protected void wakePollIfNeeded(final int waitingCount) { + if (waitingCount > 1) { + schedWaitCond.signalAll(); + } else if (waitingCount > 0) { + schedWaitCond.signal(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java new file mode 100644 index 0000000..6335832 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Basic ProcedureEvent that contains an "object", which can be a + * description or a reference to the resource to wait on, and a + * queue for suspended procedures. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProcedureEvent<T> extends ProcedureEventQueue { + private final T object; + + private boolean ready = false; + + public ProcedureEvent(final T object) { + this.object = object; + } + + public T getObject() { + return object; + } + + public synchronized boolean isReady() { + return ready; + } + + @InterfaceAudience.Private + protected synchronized void setReady(final boolean isReady) { + this.ready = isReady; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + object + ")"; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java new file mode 100644 index 0000000..a109e9e --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.ArrayDeque; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Basic queue to store suspended procedures. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProcedureEventQueue { + private static final Log LOG = LogFactory.getLog(ProcedureEventQueue.class); + + private ArrayDeque<Procedure> waitingProcedures = null; + + public ProcedureEventQueue() { + } + + @InterfaceAudience.Private + public synchronized void suspendProcedure(final Procedure proc) { + if (waitingProcedures == null) { + waitingProcedures = new ArrayDeque<Procedure>(); + } + waitingProcedures.addLast(proc); + } + + @InterfaceAudience.Private + public synchronized void removeProcedure(final Procedure proc) { + if (waitingProcedures != null) { + waitingProcedures.remove(proc); + } + } + + @InterfaceAudience.Private + public synchronized boolean hasWaitingProcedures() { + return waitingProcedures != null; + } + + @InterfaceAudience.Private + public synchronized Procedure popWaitingProcedure(final boolean popFront) { + // it will be nice to use IterableList on a procedure and avoid allocations... + Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast(); + if (waitingProcedures.isEmpty()) { + waitingProcedures = null; + } + return proc; + } + + @VisibleForTesting + public synchronized void clear() { + waitingProcedures = null; + } + + @VisibleForTesting + public synchronized int size() { + if (waitingProcedures != null) { + return waitingProcedures.size(); + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 2eeef9e..2e9e3a3 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -243,9 +243,9 @@ public class ProcedureExecutor<TEnvironment> { new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever()); /** - * Queue that contains runnable procedures. + * Scheduler/Queue that contains runnable procedures. */ - private final ProcedureRunnableSet runnables; + private final ProcedureScheduler scheduler; // TODO private final ReentrantLock submitLock = new ReentrantLock(); @@ -267,13 +267,13 @@ public class ProcedureExecutor<TEnvironment> { public ProcedureExecutor(final Configuration conf, final TEnvironment environment, final ProcedureStore store) { - this(conf, environment, store, new ProcedureSimpleRunQueue()); + this(conf, environment, store, new SimpleProcedureScheduler()); } public ProcedureExecutor(final Configuration conf, final TEnvironment environment, - final ProcedureStore store, final ProcedureRunnableSet runqueue) { + final ProcedureStore store, final ProcedureScheduler scheduler) { this.environment = environment; - this.runnables = runqueue; + this.scheduler = scheduler; this.store = store; this.conf = conf; this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, true); @@ -284,7 +284,7 @@ public class ProcedureExecutor<TEnvironment> { Preconditions.checkArgument(rollbackStack.isEmpty()); Preconditions.checkArgument(procedures.isEmpty()); Preconditions.checkArgument(waitingTimeout.isEmpty()); - Preconditions.checkArgument(runnables.size() == 0); + Preconditions.checkArgument(scheduler.size() == 0); store.load(new ProcedureStore.ProcedureLoader() { @Override @@ -378,7 +378,7 @@ public class ProcedureExecutor<TEnvironment> { Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback? - runnables.addBack(proc); + scheduler.addBack(proc); continue; } @@ -410,8 +410,8 @@ public class ProcedureExecutor<TEnvironment> { break; case FINISHED: if (proc.hasException()) { - // add the proc to the runnables to perform the rollback - runnables.addBack(proc); + // add the proc to the scheduler to perform the rollback + scheduler.addBack(proc); } break; case ROLLEDBACK: @@ -446,7 +446,7 @@ public class ProcedureExecutor<TEnvironment> { throw new IOException("found " + corruptedCount + " procedures on replay"); } - // 4. Push the runnables + // 4. Push the scheduler if (!runnableList.isEmpty()) { // TODO: See ProcedureWALFormatReader#hasFastStartSupport // some procedure may be started way before this stuff. @@ -457,10 +457,10 @@ public class ProcedureExecutor<TEnvironment> { sendProcedureLoadedNotification(proc.getProcId()); } if (proc.wasExecuted()) { - runnables.addFront(proc); + scheduler.addFront(proc); } else { // if it was not in execution, it can wait. - runnables.addBack(proc); + scheduler.addBack(proc); } } } @@ -514,6 +514,9 @@ public class ProcedureExecutor<TEnvironment> { LOG.info(String.format("recover procedure store (%s) lease: %s", store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st))); + // start the procedure scheduler + scheduler.start(); + // TODO: Split in two steps. // TODO: Handle corrupted procedures (currently just a warn) // The first one will make sure that we have the latest id, @@ -540,7 +543,7 @@ public class ProcedureExecutor<TEnvironment> { } LOG.info("Stopping the procedure executor"); - runnables.signalAll(); + scheduler.stop(); waitingTimeout.signalAll(); } @@ -564,7 +567,7 @@ public class ProcedureExecutor<TEnvironment> { procedures.clear(); nonceKeysToProcIdsMap.clear(); waitingTimeout.clear(); - runnables.clear(); + scheduler.clear(); lastProcId.set(-1); } @@ -698,7 +701,7 @@ public class ProcedureExecutor<TEnvironment> { assert !procedures.containsKey(currentProcId); procedures.put(currentProcId, proc); sendProcedureAddedNotification(currentProcId); - runnables.addBack(proc); + scheduler.addBack(proc); return currentProcId; } @@ -810,18 +813,18 @@ public class ProcedureExecutor<TEnvironment> { return procedures.get(procId); } - protected ProcedureRunnableSet getRunnableSet() { - return runnables; + protected ProcedureScheduler getScheduler() { + return scheduler; } /** * Execution loop (N threads) * while the executor is in a running state, - * fetch a procedure from the runnables queue and start the execution. + * fetch a procedure from the scheduler queue and start the execution. */ private void execLoop() { while (isRunning()) { - Procedure proc = runnables.poll(); + Procedure proc = scheduler.poll(); if (proc == null) continue; try { @@ -855,7 +858,7 @@ public class ProcedureExecutor<TEnvironment> { // we have the 'rollback-lock' we can start rollingback if (!executeRollback(rootProcId, procStack)) { procStack.unsetRollback(); - runnables.yield(proc); + scheduler.yield(proc); } } else { // if we can't rollback means that some child is still running. @@ -863,7 +866,7 @@ public class ProcedureExecutor<TEnvironment> { // If the procedure was never executed, remove and mark it as rolledback. if (!proc.wasExecuted()) { if (!executeRollback(proc)) { - runnables.yield(proc); + scheduler.yield(proc); } } } @@ -876,7 +879,7 @@ public class ProcedureExecutor<TEnvironment> { execProcedure(procStack, proc); releaseLock(proc, false); } else { - runnables.yield(proc); + scheduler.yield(proc); } procStack.release(proc); @@ -965,7 +968,7 @@ public class ProcedureExecutor<TEnvironment> { RootProcedureState procStack = rollbackStack.get(rootProcId); procStack.abort(); store.update(proc); - runnables.addFront(proc); + scheduler.addFront(proc); continue; } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) { waitingTimeout.add(proc); @@ -1124,11 +1127,11 @@ public class ProcedureExecutor<TEnvironment> { if (LOG.isTraceEnabled()) { LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage()); } - runnables.yield(procedure); + scheduler.yield(procedure); return; } catch (InterruptedException e) { handleInterruptedException(procedure, e); - runnables.yield(procedure); + scheduler.yield(procedure); return; } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... @@ -1205,7 +1208,7 @@ public class ProcedureExecutor<TEnvironment> { // if the procedure is kind enough to pass the slot to someone else, yield if (procedure.getState() == ProcedureState.RUNNABLE && procedure.isYieldAfterExecutionStep(getEnvironment())) { - runnables.yield(procedure); + scheduler.yield(procedure); return; } @@ -1218,7 +1221,7 @@ public class ProcedureExecutor<TEnvironment> { Procedure subproc = subprocs[i]; assert !procedures.containsKey(subproc.getProcId()); procedures.put(subproc.getProcId(), subproc); - runnables.addFront(subproc); + scheduler.addFront(subproc); } } @@ -1236,7 +1239,7 @@ public class ProcedureExecutor<TEnvironment> { if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) { parent.setState(ProcedureState.RUNNABLE); store.update(parent); - runnables.addFront(parent); + scheduler.addFront(parent); if (LOG.isTraceEnabled()) { LOG.trace(parent + " all the children finished their work, resume."); } @@ -1374,10 +1377,10 @@ public class ProcedureExecutor<TEnvironment> { // call the runnableSet completion cleanup handler try { - runnables.completionCleanup(proc); + scheduler.completionCleanup(proc); } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... - LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e); + LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: " + proc, e); } // Notify the listeners http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java deleted file mode 100644 index 64c41ee..0000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * Keep track of the runnable procedures - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface ProcedureRunnableSet { - /** - * Inserts the specified element at the front of this queue. - * @param proc the Procedure to add - */ - void addFront(Procedure proc); - - /** - * Inserts the specified element at the end of this queue. - * @param proc the Procedure to add - */ - void addBack(Procedure proc); - - /** - * The procedure can't run at the moment. - * add it back to the queue, giving priority to someone else. - * @param proc the Procedure to add back to the list - */ - void yield(Procedure proc); - - /** - * The procedure in execution completed. - * This can be implemented to perform cleanups. - * @param proc the Procedure that completed the execution. - */ - void completionCleanup(Procedure proc); - - /** - * Fetch one Procedure from the queue - * @return the Procedure to execute, or null if nothing present. - */ - Procedure poll(); - - /** - * In case the class is blocking on poll() waiting for items to be added, - * this method should awake poll() and poll() should return. - */ - void signalAll(); - - /** - * Returns the number of elements in this collection. - * @return the number of elements in this collection. - */ - @VisibleForTesting - int size(); - - /** - * Removes all of the elements from this collection. - */ - void clear(); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java new file mode 100644 index 0000000..1793158 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Keep track of the runnable procedures + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface ProcedureScheduler { + /** + * Start the scheduler + */ + void start(); + + /** + * Stop the scheduler + */ + void stop(); + + /** + * In case the class is blocking on poll() waiting for items to be added, + * this method should awake poll() and poll() should return. + */ + void signalAll(); + + /** + * Inserts the specified element at the front of this queue. + * @param proc the Procedure to add + */ + void addFront(Procedure proc); + + /** + * Inserts the specified element at the end of this queue. + * @param proc the Procedure to add + */ + void addBack(Procedure proc); + + /** + * The procedure can't run at the moment. + * add it back to the queue, giving priority to someone else. + * @param proc the Procedure to add back to the list + */ + void yield(Procedure proc); + + /** + * The procedure in execution completed. + * This can be implemented to perform cleanups. + * @param proc the Procedure that completed the execution. + */ + void completionCleanup(Procedure proc); + + /** + * @return true if there are procedures available to process, otherwise false. + */ + boolean hasRunnables(); + + /** + * Fetch one Procedure from the queue + * @return the Procedure to execute, or null if nothing present. + */ + Procedure poll(); + + /** + * Fetch one Procedure from the queue + * @param timeout how long to wait before giving up, in units of unit + * @param unit a TimeUnit determining how to interpret the timeout parameter + * @return the Procedure to execute, or null if nothing present. + */ + Procedure poll(long timeout, TimeUnit unit); + + /** + * Mark the event has not ready. + * procedures calling waitEvent() will be suspended. + * @param event the event to mark as suspended/not ready + */ + void suspendEvent(ProcedureEvent event); + + /** + * Wake every procedure waiting for the specified event + * (By design each event has only one "wake" caller) + * @param event the event to wait + */ + void wakeEvent(ProcedureEvent event); + + /** + * Wake every procedure waiting for the specified events. + * (By design each event has only one "wake" caller) + * @param count the number of events in the array to wake + * @param events the list of events to wake + */ + void wakeEvents(int count, ProcedureEvent... events); + + /** + * Suspend the procedure if the event is not ready yet. + * @param event the event to wait on + * @param procedure the procedure waiting on the event + * @return true if the procedure has to wait for the event to be ready, false otherwise. + */ + boolean waitEvent(ProcedureEvent event, Procedure procedure); + + /** + * Returns the number of elements in this queue. + * @return the number of elements in this queue. + */ + @VisibleForTesting + int size(); + + /** + * Removes all of the elements from the queue + */ + @VisibleForTesting + void clear(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java deleted file mode 100644 index d23680d..0000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * Simple runqueue for the procedures - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { - private final Deque<Procedure> runnables = new ArrayDeque<Procedure>(); - private final ReentrantLock lock = new ReentrantLock(); - private final Condition waitCond = lock.newCondition(); - - @Override - public void addFront(final Procedure proc) { - lock.lock(); - try { - runnables.addFirst(proc); - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - @Override - public void addBack(final Procedure proc) { - lock.lock(); - try { - runnables.addLast(proc); - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - @Override - public void yield(final Procedure proc) { - addBack(proc); - } - - @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public Procedure poll() { - lock.lock(); - try { - if (runnables.isEmpty()) { - waitCond.await(); - if (!runnables.isEmpty()) { - return runnables.pop(); - } - } else { - return runnables.pop(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } finally { - lock.unlock(); - } - return null; - } - - @Override - public void signalAll() { - lock.lock(); - try { - waitCond.signalAll(); - } finally { - lock.unlock(); - } - } - - @Override - public void clear() { - lock.lock(); - try { - runnables.clear(); - } finally { - lock.unlock(); - } - } - - @Override - public int size() { - lock.lock(); - try { - return runnables.size(); - } finally { - lock.unlock(); - } - } - - @Override - public void completionCleanup(Procedure proc) { - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java new file mode 100644 index 0000000..ffc8273 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.util.ArrayDeque; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Simple scheduler for procedures + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SimpleProcedureScheduler extends AbstractProcedureScheduler { + private final ArrayDeque<Procedure> runnables = new ArrayDeque<Procedure>(); + + @Override + protected void enqueue(final Procedure procedure, final boolean addFront) { + if (addFront) { + runnables.addFirst(procedure); + } else { + runnables.addLast(procedure); + } + } + + @Override + protected Procedure dequeue() { + return runnables.poll(); + } + + @Override + protected void clearQueue() { + runnables.clear(); + } + + @Override + public void yield(final Procedure proc) { + addBack(proc); + } + + @Override + public boolean queueHasRunnables() { + return runnables.size() > 0; + } + + @Override + public int queueSize() { + return runnables.size(); + } + + @Override + public void completionCleanup(Procedure proc) { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 0b85ff8..f2c7e6b 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -181,7 +181,7 @@ public class ProcedureTestingUtility { public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) { int stableRuns = 0; while (stableRuns < 10) { - if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getRunnableSet().size() > 0) { + if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) { stableRuns = 0; Threads.sleepWithoutInterrupt(100); } else { @@ -236,7 +236,32 @@ public class ProcedureTestingUtility { return cause == null ? procInfo.getException() : cause; } - public static class TestProcedure extends Procedure<Void> { + public static class NoopProcedure<TEnv> extends Procedure<TEnv> { + public NoopProcedure() {} + + @Override + protected Procedure[] execute(TEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + return null; + } + + @Override + protected void rollback(TEnv env) throws IOException, InterruptedException { + } + + @Override + protected boolean abort(TEnv env) { return false; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + } + } + + public static class TestProcedure extends NoopProcedure<Void> { private byte[] data = null; public TestProcedure() {} @@ -270,15 +295,6 @@ public class ProcedureTestingUtility { } @Override - protected Procedure[] execute(Void env) { return null; } - - @Override - protected void rollback(Void env) { } - - @Override - protected boolean abort(Void env) { return false; } - - @Override protected void serializeStateData(final OutputStream stream) throws IOException { StreamUtils.writeRawVInt32(stream, data != null ? data.length : 0); if (data != null) stream.write(data); http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java new file mode 100644 index 0000000..c431646 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureEvents { + private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class); + + private TestProcEnv procEnv; + private NoopProcedureStore procStore; + private ProcedureExecutor<TestProcEnv> procExecutor; + + private HBaseCommonTestingUtility htu; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + + procEnv = new TestProcEnv(); + procStore = new NoopProcedureStore(); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procStore.start(1); + procExecutor.start(1, true); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + procExecutor.join(); + } + + @Test(timeout=30000) + public void testTimeoutEventProcedure() throws Exception { + final int NTIMEOUTS = 5; + + TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, NTIMEOUTS); + procExecutor.submitProcedure(proc); + + ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId()); + ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId())); + assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount()); + } + + public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> { + private final ProcedureEvent event = new ProcedureEvent("timeout-event"); + + private final AtomicInteger ntimeouts = new AtomicInteger(0); + private int maxTimeouts = 1; + + public TestTimeoutEventProcedure() {} + + public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) { + this.maxTimeouts = maxTimeouts; + setTimeout(timeoutMsec); + } + + public int getTimeoutsCount() { + return ntimeouts.get(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { + LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts); + if (ntimeouts.get() > maxTimeouts) { + setAbortFailure("test", "give up after " + ntimeouts.get()); + return null; + } + + env.getProcedureScheduler().suspendEvent(event); + if (env.getProcedureScheduler().waitEvent(event, this)) { + setState(ProcedureState.WAITING_TIMEOUT); + throw new ProcedureSuspendedException(); + } + + return null; + } + + @Override + protected boolean setTimeoutFailure(final TestProcEnv env) { + int n = ntimeouts.incrementAndGet(); + LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n); + setState(ProcedureState.RUNNABLE); + env.getProcedureScheduler().wakeEvent(event); + return false; + } + } + + private class TestProcEnv { + public ProcedureScheduler getProcedureScheduler() { + return procExecutor.getScheduler(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java new file mode 100644 index 0000000..b8cd8ff --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Threads; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, MediumTests.class}) +public class TestProcedureSchedulerConcurrency { + private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class); + + private SimpleProcedureScheduler procSched; + + @Before + public void setUp() throws IOException { + procSched = new SimpleProcedureScheduler(); + procSched.start(); + } + + @After + public void tearDown() throws IOException { + procSched.stop(); + } + + @Test(timeout=60000) + public void testConcurrentWaitWake() throws Exception { + testConcurrentWaitWake(false); + } + + @Test(timeout=60000) + public void testConcurrentWaitWakeBatch() throws Exception { + testConcurrentWaitWake(true); + } + + private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception { + final int WAIT_THRESHOLD = 2500; + final int NPROCS = 20; + final int NRUNS = 500; + + final ProcedureScheduler sched = procSched; + for (long i = 0; i < NPROCS; ++i) { + sched.addBack(new TestProcedureWithEvent(i)); + } + + final Thread[] threads = new Thread[4]; + final AtomicInteger waitCount = new AtomicInteger(0); + final AtomicInteger wakeCount = new AtomicInteger(0); + + final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = + new ConcurrentSkipListSet<TestProcedureWithEvent>(); + threads[0] = new Thread() { + @Override + public void run() { + long lastUpdate = 0; + while (true) { + final int oldWakeCount = wakeCount.get(); + if (useWakeBatch) { + ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()]; + for (int i = 0; i < ev.length; ++i) { + ev[i] = waitQueue.pollFirst().getEvent(); + LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get()); + } + sched.wakeEvents(ev.length, ev); + wakeCount.addAndGet(ev.length); + } else { + int size = waitQueue.size(); + while (size-- > 0) { + ProcedureEvent ev = waitQueue.pollFirst().getEvent(); + sched.wakeEvent(ev); + LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); + wakeCount.incrementAndGet(); + } + } + if (wakeCount.get() != oldWakeCount) { + lastUpdate = System.currentTimeMillis(); + } else if (wakeCount.get() >= NRUNS && + (System.currentTimeMillis() - lastUpdate) > WAIT_THRESHOLD) { + break; + } + Threads.sleepWithoutInterrupt(25); + } + } + }; + + for (int i = 1; i < threads.length; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + while (true) { + TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll(); + if (proc == null) continue; + + sched.suspendEvent(proc.getEvent()); + waitQueue.add(proc); + sched.waitEvent(proc.getEvent(), proc); + LOG.debug("WAIT " + proc.getEvent()); + if (waitCount.incrementAndGet() >= NRUNS) { + break; + } + } + } + }; + } + + for (int i = 0; i < threads.length; ++i) { + threads[i].start(); + } + for (int i = 0; i < threads.length; ++i) { + threads[i].join(); + } + + sched.clear(); + } + + public static class TestProcedureWithEvent extends NoopProcedure<Void> { + private final ProcedureEvent event; + + public TestProcedureWithEvent(long procId) { + setProcId(procId); + event = new ProcedureEvent("test-event procId=" + procId); + } + + public ProcedureEvent getEvent() { + return event; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java index eb72939..9a108a8 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java @@ -92,7 +92,7 @@ public class TestProcedureSuspended { // release p3 p3keyB.setThrowSuspend(false); - procExecutor.getRunnableSet().addFront(p3keyB); + procExecutor.getScheduler().addFront(p3keyB); waitAndAssertTimestamp(p1keyA, 1, 1); waitAndAssertTimestamp(p2keyA, 0, -1); waitAndAssertTimestamp(p3keyB, 2, 3); @@ -104,7 +104,7 @@ public class TestProcedureSuspended { // rollback p2 and wait until is fully completed p1keyA.setTriggerRollback(true); - procExecutor.getRunnableSet().addFront(p1keyA); + procExecutor.getScheduler().addFront(p1keyA); ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA); // p2 should start and suspend @@ -115,7 +115,7 @@ public class TestProcedureSuspended { // wait until p2 is fully completed p2keyA.setThrowSuspend(false); - procExecutor.getRunnableSet().addFront(p2keyA); + procExecutor.getScheduler().addFront(p2keyA); ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA); waitAndAssertTimestamp(p1keyA, 4, 60000); waitAndAssertTimestamp(p2keyA, 2, 8); http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index 6e66f76..165179d 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,7 +51,7 @@ public class TestYieldProcedures { private static final Procedure NULL_PROC = null; private ProcedureExecutor<TestProcEnv> procExecutor; - private TestRunQueue procRunnables; + private TestScheduler procRunnables; private ProcedureStore procStore; private HBaseCommonTestingUtility htu; @@ -67,7 +68,7 @@ public class TestYieldProcedures { logDir = new Path(testDir, "proc-logs"); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); - procRunnables = new TestRunQueue(); + procRunnables = new TestScheduler(); procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); procStore.start(PROCEDURE_EXECUTOR_SLOTS); @@ -343,41 +344,47 @@ public class TestYieldProcedures { } } - private static class TestRunQueue extends ProcedureSimpleRunQueue { + private static class TestScheduler extends SimpleProcedureScheduler { private int completionCalls; private int addFrontCalls; private int addBackCalls; private int yieldCalls; private int pollCalls; - public TestRunQueue() {} + public TestScheduler() {} public void addFront(final Procedure proc) { - addFrontCalls++; - super.addFront(proc); - } + addFrontCalls++; + super.addFront(proc); + } - @Override - public void addBack(final Procedure proc) { - addBackCalls++; - super.addBack(proc); - } + @Override + public void addBack(final Procedure proc) { + addBackCalls++; + super.addBack(proc); + } - @Override - public void yield(final Procedure proc) { - yieldCalls++; - super.yield(proc); - } + @Override + public void yield(final Procedure proc) { + yieldCalls++; + super.yield(proc); + } - @Override - public Procedure poll() { - pollCalls++; - return super.poll(); - } + @Override + public Procedure poll() { + pollCalls++; + return super.poll(); + } - @Override - public void completionCleanup(Procedure proc) { - completionCalls++; - } + @Override + public Procedure poll(long timeout, TimeUnit unit) { + pollCalls++; + return super.poll(timeout, unit); + } + + @Override + public void completionCleanup(Procedure proc) { + completionCalls++; + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b23ce43..af5d03d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -108,7 +108,6 @@ import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -120,6 +119,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index e90813c..183b41d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.security.User; @@ -121,10 +121,15 @@ public class MasterProcedureEnv { return master.getMasterCoprocessorHost(); } + @Deprecated public MasterProcedureScheduler getProcedureQueue() { return procSched; } + public MasterProcedureScheduler getProcedureScheduler() { + return procSched; + } + public boolean isRunning() { return master.getMasterProcedureExecutor().isRunning(); }