HBASE-16813 Procedure v2 - Move ProcedureEvent to hbase-procedure module

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/92ef2344
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/92ef2344
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/92ef2344

Branch: refs/heads/master
Commit: 92ef234486537b4325641ce47f6fde26d9432710
Parents: dfb2a80
Author: Matteo Bertozzi <matteo.berto...@cloudera.com>
Authored: Wed Oct 12 16:33:25 2016 -0700
Committer: Matteo Bertozzi <matteo.berto...@cloudera.com>
Committed: Wed Oct 12 16:33:25 2016 -0700

----------------------------------------------------------------------
 .../procedure2/AbstractProcedureScheduler.java  | 311 ++++++++++
 .../hadoop/hbase/procedure2/ProcedureEvent.java |  57 ++
 .../hbase/procedure2/ProcedureEventQueue.java   |  85 +++
 .../hbase/procedure2/ProcedureExecutor.java     |  63 +-
 .../hbase/procedure2/ProcedureRunnableSet.java  |  81 ---
 .../hbase/procedure2/ProcedureScheduler.java    | 137 +++++
 .../procedure2/ProcedureSimpleRunQueue.java     | 121 ----
 .../procedure2/SimpleProcedureScheduler.java    |  71 +++
 .../procedure2/ProcedureTestingUtility.java     |  38 +-
 .../hbase/procedure2/TestProcedureEvents.java   | 132 +++++
 .../TestProcedureSchedulerConcurrency.java      | 160 ++++++
 .../procedure2/TestProcedureSuspended.java      |   6 +-
 .../hbase/procedure2/TestYieldProcedures.java   |  59 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |   2 +-
 .../master/procedure/MasterProcedureEnv.java    |   7 +-
 .../procedure/MasterProcedureScheduler.java     | 569 +++----------------
 .../procedure/TestMasterProcedureEvents.java    |  92 +--
 .../procedure/TestMasterProcedureScheduler.java |  36 +-
 ...TestMasterProcedureSchedulerConcurrency.java |  93 +--
 19 files changed, 1145 insertions(+), 975 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
new file mode 100644
index 0000000..c4ae877
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class AbstractProcedureScheduler implements ProcedureScheduler 
{
+  private static final Log LOG = 
LogFactory.getLog(AbstractProcedureScheduler.class);
+
+  private final ReentrantLock schedLock = new ReentrantLock();
+  private final Condition schedWaitCond = schedLock.newCondition();
+  private boolean running = false;
+
+  // TODO: metrics
+  private long pollCalls = 0;
+  private long nullPollCalls = 0;
+
+  @Override
+  public void start() {
+    schedLock();
+    try {
+      running = true;
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  @Override
+  public void stop() {
+    schedLock();
+    try {
+      running = false;
+      schedWaitCond.signalAll();
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  @Override
+  public void signalAll() {
+    schedLock();
+    try {
+      schedWaitCond.signalAll();
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  // ==========================================================================
+  //  Add related
+  // ==========================================================================
+  /**
+   * Add the procedure to the queue.
+   * NOTE: this method is called with the sched lock held.
+   * @param procedure the Procedure to add
+   * @param addFront true if the item should be added to the front of the queue
+   */
+  protected abstract void enqueue(Procedure procedure, boolean addFront);
+
+  public void addFront(final Procedure procedure) {
+    push(procedure, true, true);
+  }
+
+  public void addBack(final Procedure procedure) {
+    push(procedure, false, true);
+  }
+
+  protected void push(final Procedure procedure, final boolean addFront, final 
boolean notify) {
+    schedLock.lock();
+    try {
+      enqueue(procedure, addFront);
+      if (notify) {
+        schedWaitCond.signal();
+      }
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  // ==========================================================================
+  //  Poll related
+  // ==========================================================================
+  /**
+   * Fetch one Procedure from the queue
+   * NOTE: this method is called with the sched lock held.
+   * @return the Procedure to execute, or null if nothing is available.
+   */
+  protected abstract Procedure dequeue();
+
+  @Override
+  public Procedure poll() {
+    return poll(-1);
+  }
+
+  @Override
+  public Procedure poll(long timeout, TimeUnit unit) {
+    return poll(unit.toNanos(timeout));
+  }
+
+  public Procedure poll(long nanos) {
+    final boolean waitForever = (nanos < 0);
+    schedLock();
+    try {
+      while (!queueHasRunnables()) {
+        if (!running) return null;
+        if (waitForever) {
+          schedWaitCond.await();
+        } else {
+          if (nanos <= 0) return null;
+          nanos = schedWaitCond.awaitNanos(nanos);
+        }
+      }
+
+      final Procedure pollResult = dequeue();
+      pollCalls++;
+      nullPollCalls += (pollResult == null) ? 1 : 0;
+      return pollResult;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      nullPollCalls++;
+      return null;
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  // ==========================================================================
+  //  Utils
+  // ==========================================================================
+  /**
+   * Removes all of the elements from the queue
+   * NOTE: this method is called with the sched lock held.
+   */
+  protected abstract void clearQueue();
+
+  /**
+   * Returns the number of elements in this queue.
+   * NOTE: this method is called with the sched lock held.
+   * @return the number of elements in this queue.
+   */
+  protected abstract int queueSize();
+
+  /**
+   * Returns true if there are procedures available to process.
+   * NOTE: this method is called with the sched lock held.
+   * @return true if there are procedures available to process, otherwise 
false.
+   */
+  protected abstract boolean queueHasRunnables();
+
+  @Override
+  public void clear() {
+    // NOTE: USED ONLY FOR TESTING
+    schedLock();
+    try {
+      clearQueue();
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  @Override
+  public int size() {
+    schedLock();
+    try {
+      return queueSize();
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  @Override
+  public boolean hasRunnables() {
+    schedLock();
+    try {
+      return queueHasRunnables();
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  // 
============================================================================
+  //  TODO: Metrics
+  // 
============================================================================
+  public long getPollCalls() {
+    return pollCalls;
+  }
+
+  public long getNullPollCalls() {
+    return nullPollCalls;
+  }
+
+  // ==========================================================================
+  //  Procedure Events
+  // ==========================================================================
+  @Override
+  public boolean waitEvent(final ProcedureEvent event, final Procedure 
procedure) {
+    synchronized (event) {
+      if (event.isReady()) {
+        return false;
+      }
+      suspendProcedure(event, procedure);
+      return true;
+    }
+  }
+
+  @Override
+  public void suspendEvent(final ProcedureEvent event) {
+    final boolean isTraceEnabled = LOG.isTraceEnabled();
+    synchronized (event) {
+      event.setReady(false);
+      if (isTraceEnabled) {
+        LOG.trace("Suspend event " + event);
+      }
+    }
+  }
+
+  @Override
+  public void wakeEvent(final ProcedureEvent event) {
+    wakeEvents(1, event);
+  }
+
+  @Override
+  public void wakeEvents(final int count, final ProcedureEvent... events) {
+    final boolean isTraceEnabled = LOG.isTraceEnabled();
+    schedLock();
+    try {
+      int waitingCount = 0;
+      for (int i = 0; i < count; ++i) {
+        final ProcedureEvent event = events[i];
+        synchronized (event) {
+          event.setReady(true);
+          if (isTraceEnabled) {
+            LOG.trace("Wake event " + event);
+          }
+          waitingCount += popEventWaitingObjects(event);
+        }
+      }
+      wakePollIfNeeded(waitingCount);
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  protected int popEventWaitingObjects(final ProcedureEvent event) {
+    return popEventWaitingProcedures(event);
+  }
+
+  protected int popEventWaitingProcedures(final ProcedureEventQueue event) {
+    int count = 0;
+    while (event.hasWaitingProcedures()) {
+      wakeProcedure(event.popWaitingProcedure(false));
+      count++;
+    }
+    return count;
+  }
+
+  protected void suspendProcedure(final ProcedureEventQueue event, final 
Procedure procedure) {
+    procedure.suspend();
+    event.suspendProcedure(procedure);
+  }
+
+  protected void wakeProcedure(final Procedure procedure) {
+    procedure.resume();
+    push(procedure, /* addFront= */ true, /* notify= */false);
+  }
+
+  // ==========================================================================
+  //  Internal helpers
+  // ==========================================================================
+  protected void schedLock() {
+    schedLock.lock();
+  }
+
+  protected void schedUnlock() {
+    schedLock.unlock();
+  }
+
+  protected void wakePollIfNeeded(final int waitingCount) {
+    if (waitingCount > 1) {
+      schedWaitCond.signalAll();
+    } else if (waitingCount > 0) {
+      schedWaitCond.signal();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
new file mode 100644
index 0000000..6335832
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Basic ProcedureEvent that contains an "object", which can be a
+ * description or a reference to the resource to wait on, and a
+ * queue for suspended procedures.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureEvent<T> extends ProcedureEventQueue {
+  private final T object;
+
+  private boolean ready = false;
+
+  public ProcedureEvent(final T object) {
+    this.object = object;
+  }
+
+  public T getObject() {
+    return object;
+  }
+
+  public synchronized boolean isReady() {
+    return ready;
+  }
+
+  @InterfaceAudience.Private
+  protected synchronized void setReady(final boolean isReady) {
+    this.ready = isReady;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "(" + object + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java
new file mode 100644
index 0000000..a109e9e
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayDeque;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Basic queue to store suspended procedures.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureEventQueue {
+  private static final Log LOG = LogFactory.getLog(ProcedureEventQueue.class);
+
+  private ArrayDeque<Procedure> waitingProcedures = null;
+
+  public ProcedureEventQueue() {
+  }
+
+  @InterfaceAudience.Private
+  public synchronized void suspendProcedure(final Procedure proc) {
+    if (waitingProcedures == null) {
+      waitingProcedures = new ArrayDeque<Procedure>();
+    }
+    waitingProcedures.addLast(proc);
+  }
+
+  @InterfaceAudience.Private
+  public synchronized void removeProcedure(final Procedure proc) {
+    if (waitingProcedures != null) {
+      waitingProcedures.remove(proc);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public synchronized boolean hasWaitingProcedures() {
+    return waitingProcedures != null;
+  }
+
+  @InterfaceAudience.Private
+  public synchronized Procedure popWaitingProcedure(final boolean popFront) {
+    // it will be nice to use IterableList on a procedure and avoid 
allocations...
+    Procedure proc = popFront ? waitingProcedures.removeFirst() : 
waitingProcedures.removeLast();
+    if (waitingProcedures.isEmpty()) {
+      waitingProcedures = null;
+    }
+    return proc;
+  }
+
+  @VisibleForTesting
+  public synchronized void clear() {
+    waitingProcedures = null;
+  }
+
+  @VisibleForTesting
+  public synchronized int size() {
+    if (waitingProcedures != null) {
+      return waitingProcedures.size();
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 2eeef9e..2e9e3a3 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -243,9 +243,9 @@ public class ProcedureExecutor<TEnvironment> {
     new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
 
   /**
-   * Queue that contains runnable procedures.
+   * Scheduler/Queue that contains runnable procedures.
    */
-  private final ProcedureRunnableSet runnables;
+  private final ProcedureScheduler scheduler;
 
   // TODO
   private final ReentrantLock submitLock = new ReentrantLock();
@@ -267,13 +267,13 @@ public class ProcedureExecutor<TEnvironment> {
 
   public ProcedureExecutor(final Configuration conf, final TEnvironment 
environment,
       final ProcedureStore store) {
-    this(conf, environment, store, new ProcedureSimpleRunQueue());
+    this(conf, environment, store, new SimpleProcedureScheduler());
   }
 
   public ProcedureExecutor(final Configuration conf, final TEnvironment 
environment,
-      final ProcedureStore store, final ProcedureRunnableSet runqueue) {
+      final ProcedureStore store, final ProcedureScheduler scheduler) {
     this.environment = environment;
-    this.runnables = runqueue;
+    this.scheduler = scheduler;
     this.store = store;
     this.conf = conf;
     this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, true);
@@ -284,7 +284,7 @@ public class ProcedureExecutor<TEnvironment> {
     Preconditions.checkArgument(rollbackStack.isEmpty());
     Preconditions.checkArgument(procedures.isEmpty());
     Preconditions.checkArgument(waitingTimeout.isEmpty());
-    Preconditions.checkArgument(runnables.size() == 0);
+    Preconditions.checkArgument(scheduler.size() == 0);
 
     store.load(new ProcedureStore.ProcedureLoader() {
       @Override
@@ -378,7 +378,7 @@ public class ProcedureExecutor<TEnvironment> {
       Long rootProcId = getRootProcedureId(proc);
       if (rootProcId == null) {
         // The 'proc' was ready to run but the root procedure was rolledback?
-        runnables.addBack(proc);
+        scheduler.addBack(proc);
         continue;
       }
 
@@ -410,8 +410,8 @@ public class ProcedureExecutor<TEnvironment> {
           break;
         case FINISHED:
           if (proc.hasException()) {
-            // add the proc to the runnables to perform the rollback
-            runnables.addBack(proc);
+            // add the proc to the scheduler to perform the rollback
+            scheduler.addBack(proc);
           }
           break;
         case ROLLEDBACK:
@@ -446,7 +446,7 @@ public class ProcedureExecutor<TEnvironment> {
       throw new IOException("found " + corruptedCount + " procedures on 
replay");
     }
 
-    // 4. Push the runnables
+    // 4. Push the scheduler
     if (!runnableList.isEmpty()) {
       // TODO: See ProcedureWALFormatReader#hasFastStartSupport
       // some procedure may be started way before this stuff.
@@ -457,10 +457,10 @@ public class ProcedureExecutor<TEnvironment> {
           sendProcedureLoadedNotification(proc.getProcId());
         }
         if (proc.wasExecuted()) {
-          runnables.addFront(proc);
+          scheduler.addFront(proc);
         } else {
           // if it was not in execution, it can wait.
-          runnables.addBack(proc);
+          scheduler.addBack(proc);
         }
       }
     }
@@ -514,6 +514,9 @@ public class ProcedureExecutor<TEnvironment> {
     LOG.info(String.format("recover procedure store (%s) lease: %s",
       store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
 
+    // start the procedure scheduler
+    scheduler.start();
+
     // TODO: Split in two steps.
     // TODO: Handle corrupted procedures (currently just a warn)
     // The first one will make sure that we have the latest id,
@@ -540,7 +543,7 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     LOG.info("Stopping the procedure executor");
-    runnables.signalAll();
+    scheduler.stop();
     waitingTimeout.signalAll();
   }
 
@@ -564,7 +567,7 @@ public class ProcedureExecutor<TEnvironment> {
     procedures.clear();
     nonceKeysToProcIdsMap.clear();
     waitingTimeout.clear();
-    runnables.clear();
+    scheduler.clear();
     lastProcId.set(-1);
   }
 
@@ -698,7 +701,7 @@ public class ProcedureExecutor<TEnvironment> {
     assert !procedures.containsKey(currentProcId);
     procedures.put(currentProcId, proc);
     sendProcedureAddedNotification(currentProcId);
-    runnables.addBack(proc);
+    scheduler.addBack(proc);
     return currentProcId;
   }
 
@@ -810,18 +813,18 @@ public class ProcedureExecutor<TEnvironment> {
     return procedures.get(procId);
   }
 
-  protected ProcedureRunnableSet getRunnableSet() {
-    return runnables;
+  protected ProcedureScheduler getScheduler() {
+    return scheduler;
   }
 
   /**
    * Execution loop (N threads)
    * while the executor is in a running state,
-   * fetch a procedure from the runnables queue and start the execution.
+   * fetch a procedure from the scheduler queue and start the execution.
    */
   private void execLoop() {
     while (isRunning()) {
-      Procedure proc = runnables.poll();
+      Procedure proc = scheduler.poll();
       if (proc == null) continue;
 
       try {
@@ -855,7 +858,7 @@ public class ProcedureExecutor<TEnvironment> {
           // we have the 'rollback-lock' we can start rollingback
           if (!executeRollback(rootProcId, procStack)) {
             procStack.unsetRollback();
-            runnables.yield(proc);
+            scheduler.yield(proc);
           }
         } else {
           // if we can't rollback means that some child is still running.
@@ -863,7 +866,7 @@ public class ProcedureExecutor<TEnvironment> {
           // If the procedure was never executed, remove and mark it as 
rolledback.
           if (!proc.wasExecuted()) {
             if (!executeRollback(proc)) {
-              runnables.yield(proc);
+              scheduler.yield(proc);
             }
           }
         }
@@ -876,7 +879,7 @@ public class ProcedureExecutor<TEnvironment> {
         execProcedure(procStack, proc);
         releaseLock(proc, false);
       } else {
-        runnables.yield(proc);
+        scheduler.yield(proc);
       }
       procStack.release(proc);
 
@@ -965,7 +968,7 @@ public class ProcedureExecutor<TEnvironment> {
         RootProcedureState procStack = rollbackStack.get(rootProcId);
         procStack.abort();
         store.update(proc);
-        runnables.addFront(proc);
+        scheduler.addFront(proc);
         continue;
       } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
         waitingTimeout.add(proc);
@@ -1124,11 +1127,11 @@ public class ProcedureExecutor<TEnvironment> {
         if (LOG.isTraceEnabled()) {
           LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
         }
-        runnables.yield(procedure);
+        scheduler.yield(procedure);
         return;
       } catch (InterruptedException e) {
         handleInterruptedException(procedure, e);
-        runnables.yield(procedure);
+        scheduler.yield(procedure);
         return;
       } catch (Throwable e) {
         // Catch NullPointerExceptions or similar errors...
@@ -1205,7 +1208,7 @@ public class ProcedureExecutor<TEnvironment> {
       // if the procedure is kind enough to pass the slot to someone else, 
yield
       if (procedure.getState() == ProcedureState.RUNNABLE &&
           procedure.isYieldAfterExecutionStep(getEnvironment())) {
-        runnables.yield(procedure);
+        scheduler.yield(procedure);
         return;
       }
 
@@ -1218,7 +1221,7 @@ public class ProcedureExecutor<TEnvironment> {
         Procedure subproc = subprocs[i];
         assert !procedures.containsKey(subproc.getProcId());
         procedures.put(subproc.getProcId(), subproc);
-        runnables.addFront(subproc);
+        scheduler.addFront(subproc);
       }
     }
 
@@ -1236,7 +1239,7 @@ public class ProcedureExecutor<TEnvironment> {
       if (parent.childrenCountDown() && parent.getState() == 
ProcedureState.WAITING) {
         parent.setState(ProcedureState.RUNNABLE);
         store.update(parent);
-        runnables.addFront(parent);
+        scheduler.addFront(parent);
         if (LOG.isTraceEnabled()) {
           LOG.trace(parent + " all the children finished their work, resume.");
         }
@@ -1374,10 +1377,10 @@ public class ProcedureExecutor<TEnvironment> {
 
     // call the runnableSet completion cleanup handler
     try {
-      runnables.completionCleanup(proc);
+      scheduler.completionCleanup(proc);
     } catch (Throwable e) {
       // Catch NullPointerExceptions or similar errors...
-      LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + 
runnables, e);
+      LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: 
" + proc, e);
     }
 
     // Notify the listeners

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
deleted file mode 100644
index 64c41ee..0000000
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-/**
- * Keep track of the runnable procedures
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface ProcedureRunnableSet {
-  /**
-   * Inserts the specified element at the front of this queue.
-   * @param proc the Procedure to add
-   */
-  void addFront(Procedure proc);
-
-  /**
-   * Inserts the specified element at the end of this queue.
-   * @param proc the Procedure to add
-   */
-  void addBack(Procedure proc);
-
-  /**
-   * The procedure can't run at the moment.
-   * add it back to the queue, giving priority to someone else.
-   * @param proc the Procedure to add back to the list
-   */
-  void yield(Procedure proc);
-
-  /**
-   * The procedure in execution completed.
-   * This can be implemented to perform cleanups.
-   * @param proc the Procedure that completed the execution.
-   */
-  void completionCleanup(Procedure proc);
-
-  /**
-   * Fetch one Procedure from the queue
-   * @return the Procedure to execute, or null if nothing present.
-   */
-  Procedure poll();
-
-  /**
-   * In case the class is blocking on poll() waiting for items to be added,
-   * this method should awake poll() and poll() should return.
-   */
-  void signalAll();
-
-  /**
-   * Returns the number of elements in this collection.
-   * @return the number of elements in this collection.
-   */
-  @VisibleForTesting
-  int size();
-
-  /**
-   * Removes all of the elements from this collection.
-   */
-  void clear();
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
new file mode 100644
index 0000000..1793158
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Keep track of the runnable procedures
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface ProcedureScheduler {
+  /**
+   * Start the scheduler
+   */
+  void start();
+
+  /**
+   * Stop the scheduler
+   */
+  void stop();
+
+  /**
+   * In case the class is blocking on poll() waiting for items to be added,
+   * this method should awake poll() and poll() should return.
+   */
+  void signalAll();
+
+  /**
+   * Inserts the specified element at the front of this queue.
+   * @param proc the Procedure to add
+   */
+  void addFront(Procedure proc);
+
+  /**
+   * Inserts the specified element at the end of this queue.
+   * @param proc the Procedure to add
+   */
+  void addBack(Procedure proc);
+
+  /**
+   * The procedure can't run at the moment.
+   * add it back to the queue, giving priority to someone else.
+   * @param proc the Procedure to add back to the list
+   */
+  void yield(Procedure proc);
+
+  /**
+   * The procedure in execution completed.
+   * This can be implemented to perform cleanups.
+   * @param proc the Procedure that completed the execution.
+   */
+  void completionCleanup(Procedure proc);
+
+  /**
+   * @return true if there are procedures available to process, otherwise 
false.
+   */
+  boolean hasRunnables();
+
+  /**
+   * Fetch one Procedure from the queue
+   * @return the Procedure to execute, or null if nothing present.
+   */
+  Procedure poll();
+
+  /**
+   * Fetch one Procedure from the queue
+   * @param timeout how long to wait before giving up, in units of unit
+   * @param unit a TimeUnit determining how to interpret the timeout parameter
+   * @return the Procedure to execute, or null if nothing present.
+   */
+  Procedure poll(long timeout, TimeUnit unit);
+
+  /**
+   * Mark the event has not ready.
+   * procedures calling waitEvent() will be suspended.
+   * @param event the event to mark as suspended/not ready
+   */
+  void suspendEvent(ProcedureEvent event);
+
+  /**
+   * Wake every procedure waiting for the specified event
+   * (By design each event has only one "wake" caller)
+   * @param event the event to wait
+   */
+  void wakeEvent(ProcedureEvent event);
+
+  /**
+   * Wake every procedure waiting for the specified events.
+   * (By design each event has only one "wake" caller)
+   * @param count the number of events in the array to wake
+   * @param events the list of events to wake
+   */
+  void wakeEvents(int count, ProcedureEvent... events);
+
+  /**
+   * Suspend the procedure if the event is not ready yet.
+   * @param event the event to wait on
+   * @param procedure the procedure waiting on the event
+   * @return true if the procedure has to wait for the event to be ready, 
false otherwise.
+   */
+  boolean waitEvent(ProcedureEvent event, Procedure procedure);
+
+  /**
+   * Returns the number of elements in this queue.
+   * @return the number of elements in this queue.
+   */
+  @VisibleForTesting
+  int size();
+
+  /**
+   * Removes all of the elements from the queue
+   */
+  @VisibleForTesting
+  void clear();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
deleted file mode 100644
index d23680d..0000000
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2;
-
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-/**
- * Simple runqueue for the procedures
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
-  private final Deque<Procedure> runnables = new ArrayDeque<Procedure>();
-  private final ReentrantLock lock = new ReentrantLock();
-  private final Condition waitCond = lock.newCondition();
-
-  @Override
-  public void addFront(final Procedure proc) {
-    lock.lock();
-    try {
-      runnables.addFirst(proc);
-      waitCond.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void addBack(final Procedure proc) {
-    lock.lock();
-    try {
-      runnables.addLast(proc);
-      waitCond.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void yield(final Procedure proc) {
-    addBack(proc);
-  }
-
-  @Override
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
-  public Procedure poll() {
-    lock.lock();
-    try {
-      if (runnables.isEmpty()) {
-        waitCond.await();
-        if (!runnables.isEmpty()) {
-          return runnables.pop();
-        }
-      } else {
-        return runnables.pop();
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      return null;
-    } finally {
-      lock.unlock();
-    }
-    return null;
-  }
-
-  @Override
-  public void signalAll() {
-    lock.lock();
-    try {
-      waitCond.signalAll();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void clear() {
-    lock.lock();
-    try {
-      runnables.clear();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public int size() {
-    lock.lock();
-    try {
-      return runnables.size();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void completionCleanup(Procedure proc) {
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
new file mode 100644
index 0000000..ffc8273
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.ArrayDeque;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Simple scheduler for procedures
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
+  private final ArrayDeque<Procedure> runnables = new ArrayDeque<Procedure>();
+
+  @Override
+  protected void enqueue(final Procedure procedure, final boolean addFront) {
+    if (addFront) {
+      runnables.addFirst(procedure);
+    } else {
+      runnables.addLast(procedure);
+    }
+  }
+
+  @Override
+  protected Procedure dequeue() {
+    return runnables.poll();
+  }
+
+  @Override
+  protected void clearQueue() {
+    runnables.clear();
+  }
+
+  @Override
+  public void yield(final Procedure proc) {
+    addBack(proc);
+  }
+
+  @Override
+  public boolean queueHasRunnables() {
+    return runnables.size() > 0;
+  }
+
+  @Override
+  public int queueSize() {
+    return runnables.size();
+  }
+
+  @Override
+  public void completionCleanup(Procedure proc) {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 0b85ff8..f2c7e6b 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -181,7 +181,7 @@ public class ProcedureTestingUtility {
   public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> 
procExecutor) {
     int stableRuns = 0;
     while (stableRuns < 10) {
-      if (procExecutor.getActiveExecutorCount() > 0 || 
procExecutor.getRunnableSet().size() > 0) {
+      if (procExecutor.getActiveExecutorCount() > 0 || 
procExecutor.getScheduler().size() > 0) {
         stableRuns = 0;
         Threads.sleepWithoutInterrupt(100);
       } else {
@@ -236,7 +236,32 @@ public class ProcedureTestingUtility {
     return cause == null ? procInfo.getException() : cause;
   }
 
-  public static class TestProcedure extends Procedure<Void> {
+  public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
+    public NoopProcedure() {}
+
+    @Override
+    protected Procedure[] execute(TEnv env)
+        throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
+      return null;
+    }
+
+    @Override
+    protected void rollback(TEnv env) throws IOException, InterruptedException 
{
+    }
+
+    @Override
+    protected boolean abort(TEnv env) { return false; }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws 
IOException {
+    }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws 
IOException {
+    }
+  }
+
+  public static class TestProcedure extends NoopProcedure<Void> {
     private byte[] data = null;
 
     public TestProcedure() {}
@@ -270,15 +295,6 @@ public class ProcedureTestingUtility {
     }
 
     @Override
-    protected Procedure[] execute(Void env) { return null; }
-
-    @Override
-    protected void rollback(Void env) { }
-
-    @Override
-    protected boolean abort(Void env) { return false; }
-
-    @Override
     protected void serializeStateData(final OutputStream stream) throws 
IOException {
       StreamUtils.writeRawVInt32(stream, data != null ? data.length : 0);
       if (data != null) stream.write(data);

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
new file mode 100644
index 0000000..c431646
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import 
org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureEvents {
+  private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class);
+
+  private TestProcEnv procEnv;
+  private NoopProcedureStore procStore;
+  private ProcedureExecutor<TestProcEnv> procExecutor;
+
+  private HBaseCommonTestingUtility htu;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+
+    procEnv = new TestProcEnv();
+    procStore = new NoopProcedureStore();
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, 
procStore);
+    procStore.start(1);
+    procExecutor.start(1, true);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procExecutor.stop();
+    procStore.stop(false);
+    procExecutor.join();
+  }
+
+  @Test(timeout=30000)
+  public void testTimeoutEventProcedure() throws Exception {
+    final int NTIMEOUTS = 5;
+
+    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 
NTIMEOUTS);
+    procExecutor.submitProcedure(proc);
+
+    ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId());
+    
ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
+    assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount());
+  }
+
+  public static class TestTimeoutEventProcedure extends 
NoopProcedure<TestProcEnv> {
+    private final ProcedureEvent event = new ProcedureEvent("timeout-event");
+
+    private final AtomicInteger ntimeouts = new AtomicInteger(0);
+    private int maxTimeouts = 1;
+
+    public TestTimeoutEventProcedure() {}
+
+    public TestTimeoutEventProcedure(final int timeoutMsec, final int 
maxTimeouts) {
+      this.maxTimeouts = maxTimeouts;
+      setTimeout(timeoutMsec);
+    }
+
+    public int getTimeoutsCount() {
+      return ntimeouts.get();
+    }
+
+    @Override
+    protected Procedure[] execute(final TestProcEnv env) throws 
ProcedureSuspendedException {
+      LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts);
+      if (ntimeouts.get() > maxTimeouts) {
+        setAbortFailure("test", "give up after " + ntimeouts.get());
+        return null;
+      }
+
+      env.getProcedureScheduler().suspendEvent(event);
+      if (env.getProcedureScheduler().waitEvent(event, this)) {
+        setState(ProcedureState.WAITING_TIMEOUT);
+        throw new ProcedureSuspendedException();
+      }
+
+      return null;
+    }
+
+    @Override
+    protected boolean setTimeoutFailure(final TestProcEnv env) {
+      int n = ntimeouts.incrementAndGet();
+      LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
+      setState(ProcedureState.RUNNABLE);
+      env.getProcedureScheduler().wakeEvent(event);
+      return false;
+    }
+  }
+
+  private class TestProcEnv {
+    public ProcedureScheduler getProcedureScheduler() {
+      return procExecutor.getScheduler();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java
new file mode 100644
index 0000000..b8cd8ff
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Threads;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({MasterTests.class, MediumTests.class})
+public class TestProcedureSchedulerConcurrency {
+  private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class);
+
+  private SimpleProcedureScheduler procSched;
+
+  @Before
+  public void setUp() throws IOException {
+    procSched = new SimpleProcedureScheduler();
+    procSched.start();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procSched.stop();
+  }
+
+  @Test(timeout=60000)
+  public void testConcurrentWaitWake() throws Exception {
+    testConcurrentWaitWake(false);
+  }
+
+  @Test(timeout=60000)
+  public void testConcurrentWaitWakeBatch() throws Exception {
+    testConcurrentWaitWake(true);
+  }
+
+  private void testConcurrentWaitWake(final boolean useWakeBatch) throws 
Exception {
+    final int WAIT_THRESHOLD = 2500;
+    final int NPROCS = 20;
+    final int NRUNS = 500;
+
+    final ProcedureScheduler sched = procSched;
+    for (long i = 0; i < NPROCS; ++i) {
+      sched.addBack(new TestProcedureWithEvent(i));
+    }
+
+    final Thread[] threads = new Thread[4];
+    final AtomicInteger waitCount = new AtomicInteger(0);
+    final AtomicInteger wakeCount = new AtomicInteger(0);
+
+    final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue =
+      new ConcurrentSkipListSet<TestProcedureWithEvent>();
+    threads[0] = new Thread() {
+      @Override
+      public void run() {
+        long lastUpdate = 0;
+        while (true) {
+          final int oldWakeCount = wakeCount.get();
+          if (useWakeBatch) {
+            ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
+            for (int i = 0; i < ev.length; ++i) {
+              ev[i] = waitQueue.pollFirst().getEvent();
+              LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get());
+            }
+            sched.wakeEvents(ev.length, ev);
+            wakeCount.addAndGet(ev.length);
+          } else {
+            int size = waitQueue.size();
+            while (size-- > 0) {
+              ProcedureEvent ev = waitQueue.pollFirst().getEvent();
+              sched.wakeEvent(ev);
+              LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
+              wakeCount.incrementAndGet();
+            }
+          }
+          if (wakeCount.get() != oldWakeCount) {
+            lastUpdate = System.currentTimeMillis();
+          } else if (wakeCount.get() >= NRUNS &&
+              (System.currentTimeMillis() - lastUpdate) > WAIT_THRESHOLD) {
+            break;
+          }
+          Threads.sleepWithoutInterrupt(25);
+        }
+      }
+    };
+
+    for (int i = 1; i < threads.length; ++i) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          while (true) {
+            TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll();
+            if (proc == null) continue;
+
+            sched.suspendEvent(proc.getEvent());
+            waitQueue.add(proc);
+            sched.waitEvent(proc.getEvent(), proc);
+            LOG.debug("WAIT " + proc.getEvent());
+            if (waitCount.incrementAndGet() >= NRUNS) {
+              break;
+            }
+          }
+        }
+      };
+    }
+
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i].start();
+    }
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i].join();
+    }
+
+    sched.clear();
+  }
+
+  public static class TestProcedureWithEvent extends NoopProcedure<Void> {
+    private final ProcedureEvent event;
+
+    public TestProcedureWithEvent(long procId) {
+      setProcId(procId);
+      event = new ProcedureEvent("test-event procId=" + procId);
+    }
+
+    public ProcedureEvent getEvent() {
+      return event;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
index eb72939..9a108a8 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
@@ -92,7 +92,7 @@ public class TestProcedureSuspended {
 
     // release p3
     p3keyB.setThrowSuspend(false);
-    procExecutor.getRunnableSet().addFront(p3keyB);
+    procExecutor.getScheduler().addFront(p3keyB);
     waitAndAssertTimestamp(p1keyA, 1, 1);
     waitAndAssertTimestamp(p2keyA, 0, -1);
     waitAndAssertTimestamp(p3keyB, 2, 3);
@@ -104,7 +104,7 @@ public class TestProcedureSuspended {
 
     // rollback p2 and wait until is fully completed
     p1keyA.setTriggerRollback(true);
-    procExecutor.getRunnableSet().addFront(p1keyA);
+    procExecutor.getScheduler().addFront(p1keyA);
     ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA);
 
     // p2 should start and suspend
@@ -115,7 +115,7 @@ public class TestProcedureSuspended {
 
     // wait until p2 is fully completed
     p2keyA.setThrowSuspend(false);
-    procExecutor.getRunnableSet().addFront(p2keyA);
+    procExecutor.getScheduler().addFront(p2keyA);
     ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA);
     waitAndAssertTimestamp(p1keyA, 4, 60000);
     waitAndAssertTimestamp(p2keyA, 2, 8);

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
index 6e66f76..165179d 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,7 +51,7 @@ public class TestYieldProcedures {
   private static final Procedure NULL_PROC = null;
 
   private ProcedureExecutor<TestProcEnv> procExecutor;
-  private TestRunQueue procRunnables;
+  private TestScheduler procRunnables;
   private ProcedureStore procStore;
 
   private HBaseCommonTestingUtility htu;
@@ -67,7 +68,7 @@ public class TestYieldProcedures {
 
     logDir = new Path(testDir, "proc-logs");
     procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), 
fs, logDir);
-    procRunnables = new TestRunQueue();
+    procRunnables = new TestScheduler();
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), new 
TestProcEnv(),
         procStore, procRunnables);
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
@@ -343,41 +344,47 @@ public class TestYieldProcedures {
     }
   }
 
-  private static class TestRunQueue extends ProcedureSimpleRunQueue {
+  private static class TestScheduler extends SimpleProcedureScheduler {
     private int completionCalls;
     private int addFrontCalls;
     private int addBackCalls;
     private int yieldCalls;
     private int pollCalls;
 
-    public TestRunQueue() {}
+    public TestScheduler() {}
 
     public void addFront(final Procedure proc) {
-        addFrontCalls++;
-        super.addFront(proc);
-      }
+      addFrontCalls++;
+      super.addFront(proc);
+    }
 
-      @Override
-      public void addBack(final Procedure proc) {
-        addBackCalls++;
-        super.addBack(proc);
-      }
+    @Override
+    public void addBack(final Procedure proc) {
+      addBackCalls++;
+      super.addBack(proc);
+    }
 
-      @Override
-      public void yield(final Procedure proc) {
-        yieldCalls++;
-        super.yield(proc);
-      }
+    @Override
+    public void yield(final Procedure proc) {
+      yieldCalls++;
+      super.yield(proc);
+    }
 
-      @Override
-      public Procedure poll() {
-        pollCalls++;
-        return super.poll();
-      }
+    @Override
+    public Procedure poll() {
+      pollCalls++;
+      return super.poll();
+    }
 
-      @Override
-      public void completionCleanup(Procedure proc) {
-        completionCalls++;
-      }
+    @Override
+    public Procedure poll(long timeout, TimeUnit unit) {
+      pollCalls++;
+      return super.poll(timeout, unit);
+    }
+
+    @Override
+    public void completionCleanup(Procedure proc) {
+      completionCalls++;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index b23ce43..af5d03d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -108,7 +108,6 @@ import 
org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure;
 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import 
org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@@ -120,6 +119,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import 
org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index e90813c..183b41d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MasterServices;
-import 
org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.security.User;
@@ -121,10 +121,15 @@ public class MasterProcedureEnv {
     return master.getMasterCoprocessorHost();
   }
 
+  @Deprecated
   public MasterProcedureScheduler getProcedureQueue() {
     return procSched;
   }
 
+  public MasterProcedureScheduler getProcedureScheduler() {
+    return procSched;
+  }
+
   public boolean isRunning() {
     return master.getMasterProcedureExecutor().isRunning();
   }

Reply via email to