Repository: hbase
Updated Branches:
  refs/heads/master dfb2a800c -> 92ef23448


http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index aba82c1..cc9efd2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -24,8 +24,6 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,8 +38,9 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 import 
org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
+import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
 import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
+import org.apache.hadoop.hbase.procedure2.ProcedureEventQueue;
 import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
 import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
 import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
@@ -49,8 +48,8 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
 import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
 
 /**
- * ProcedureRunnableSet for the Master Procedures.
- * This RunnableSet tries to provide to the ProcedureExecutor procedures
+ * ProcedureScheduler for the Master Procedures.
+ * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures
  * that can be executed without having to wait on a lock.
  * Most of the master operations can be executed concurrently, if they
  * are operating on different tables (e.g. two create table can be performed
@@ -65,12 +64,10 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class MasterProcedureScheduler implements ProcedureRunnableSet {
+public class MasterProcedureScheduler extends AbstractProcedureScheduler {
   private static final Log LOG = 
LogFactory.getLog(MasterProcedureScheduler.class);
 
   private final TableLockManager lockManager;
-  private final ReentrantLock schedLock = new ReentrantLock();
-  private final Condition schedWaitCond = schedLock.newCondition();
 
   private final static NamespaceQueueKeyComparator 
NAMESPACE_QUEUE_KEY_COMPARATOR =
       new NamespaceQueueKeyComparator();
@@ -90,10 +87,6 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
   private final int userTablePriority;
   private final int sysTablePriority;
 
-  // TODO: metrics
-  private long pollCalls = 0;
-  private long nullPollCalls = 0;
-
   public MasterProcedureScheduler(final Configuration conf, final 
TableLockManager lockManager) {
     this.lockManager = lockManager;
 
@@ -104,44 +97,23 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
   }
 
   @Override
-  public void addFront(Procedure proc) {
-    doAdd(proc, true);
-  }
-
-  @Override
-  public void addBack(Procedure proc) {
-    doAdd(proc, false);
-  }
-
-  @Override
   public void yield(final Procedure proc) {
-    doAdd(proc, isTableProcedure(proc));
-  }
-
-  private void doAdd(final Procedure proc, final boolean addFront) {
-    doAdd(proc, addFront, true);
+    push(proc, isTableProcedure(proc), true);
   }
 
-  private void doAdd(final Procedure proc, final boolean addFront, final 
boolean notify) {
-    schedLock.lock();
-    try {
-      if (isTableProcedure(proc)) {
-        doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, 
addFront);
-      } else if (isServerProcedure(proc)) {
-        doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, 
addFront);
-      } else {
-        // TODO: at the moment we only have Table and Server procedures
-        // if you are implementing a non-table/non-server procedure, you have 
two options: create
-        // a group for all the non-table/non-server procedures or try to find 
a key for your
-        // non-table/non-server procedures and implement something similar to 
the TableRunQueue.
-        throw new UnsupportedOperationException(
-          "RQs for non-table/non-server procedures are not implemented yet: " 
+ proc);
-      }
-      if (notify) {
-        schedWaitCond.signal();
-      }
-    } finally {
-      schedLock.unlock();
+  @Override
+  protected void enqueue(final Procedure proc, final boolean addFront) {
+    if (isTableProcedure(proc)) {
+      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
+    } else if (isServerProcedure(proc)) {
+      doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, 
addFront);
+    } else {
+      // TODO: at the moment we only have Table and Server procedures
+      // if you are implementing a non-table/non-server procedure, you have 
two options: create
+      // a group for all the non-table/non-server procedures or try to find a 
key for your
+      // non-table/non-server procedures and implement something similar to 
the TableRunQueue.
+      throw new UnsupportedOperationException(
+        "RQs for non-table/non-server procedures are not implemented yet: " + 
proc);
     }
   }
 
@@ -165,49 +137,22 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
   }
 
   @Override
-  public Procedure poll() {
-    return poll(-1);
+  protected boolean queueHasRunnables() {
+    return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables();
   }
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
-  protected Procedure poll(long waitNsec) {
-    Procedure pollResult = null;
-    schedLock.lock();
-    try {
-      if (!hasRunnables()) {
-        if (waitNsec < 0) {
-          schedWaitCond.await();
-        } else {
-          schedWaitCond.awaitNanos(waitNsec);
-        }
-        if (!hasRunnables()) {
-          return null;
-        }
-      }
-
-      // For now, let server handling have precedence over table handling; 
presumption is that it
-      // is more important handling crashed servers than it is running the
-      // enabling/disabling tables, etc.
-      pollResult = doPoll(serverRunQueue);
-      if (pollResult == null) {
-        pollResult = doPoll(tableRunQueue);
-      }
-
-      // update metrics
-      pollCalls++;
-      nullPollCalls += (pollResult == null) ? 1 : 0;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    } finally {
-      schedLock.unlock();
+  @Override
+  protected Procedure dequeue() {
+    // For now, let server handling have precedence over table handling; 
presumption is that it
+    // is more important handling crashed servers than it is running the
+    // enabling/disabling tables, etc.
+    Procedure pollResult = doPoll(serverRunQueue);
+    if (pollResult == null) {
+      pollResult = doPoll(tableRunQueue);
     }
     return pollResult;
   }
 
-  private boolean hasRunnables() {
-    return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables();
-  }
-
   private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) 
{
     final Queue<T> rq = fairq.poll();
     if (rq == null || !rq.isAvailable()) {
@@ -239,24 +184,18 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
   }
 
   @Override
-  public void clear() {
-    // NOTE: USED ONLY FOR TESTING
-    schedLock.lock();
-    try {
-      // Remove Servers
-      for (int i = 0; i < serverBuckets.length; ++i) {
-        clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
-        serverBuckets[i] = null;
-      }
+  public void clearQueue() {
+    // Remove Servers
+    for (int i = 0; i < serverBuckets.length; ++i) {
+      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
+      serverBuckets[i] = null;
+    }
 
-      // Remove Tables
-      clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
-      tableMap = null;
+    // Remove Tables
+    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
+    tableMap = null;
 
-      assert size() == 0 : "expected queue size to be 0, got " + size();
-    } finally {
-      schedLock.unlock();
-    }
+    assert size() == 0 : "expected queue size to be 0, got " + size();
   }
 
   private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode 
treeMap,
@@ -269,48 +208,25 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
     }
   }
 
-  private void wakePollIfNeeded(final int waitingCount) {
-    if (waitingCount > 1) {
-      schedWaitCond.signalAll();
-    } else if (waitingCount > 0) {
-      schedWaitCond.signal();
-    }
-  }
-
   @Override
-  public void signalAll() {
-    schedLock.lock();
-    try {
-      schedWaitCond.signalAll();
-    } finally {
-      schedLock.unlock();
-    }
-  }
+  public int queueSize() {
+    int count = 0;
 
-  @Override
-  public int size() {
-    schedLock.lock();
-    try {
-      int count = 0;
-
-      // Server queues
-      final AvlTreeIterator<ServerQueue> serverIter = new 
AvlTreeIterator<ServerQueue>();
-      for (int i = 0; i < serverBuckets.length; ++i) {
-        serverIter.seekFirst(serverBuckets[i]);
-        while (serverIter.hasNext()) {
-          count += serverIter.next().size();
-        }
+    // Server queues
+    final AvlTreeIterator<ServerQueue> serverIter = new 
AvlTreeIterator<ServerQueue>();
+    for (int i = 0; i < serverBuckets.length; ++i) {
+      serverIter.seekFirst(serverBuckets[i]);
+      while (serverIter.hasNext()) {
+        count += serverIter.next().size();
       }
+    }
 
-      // Table queues
-      final AvlTreeIterator<TableQueue> tableIter = new 
AvlTreeIterator<TableQueue>(tableMap);
-      while (tableIter.hasNext()) {
-        count += tableIter.next().size();
-      }
-      return count;
-    } finally {
-      schedLock.unlock();
+    // Table queues
+    final AvlTreeIterator<TableQueue> tableIter = new 
AvlTreeIterator<TableQueue>(tableMap);
+    while (tableIter.hasNext()) {
+      count += tableIter.next().size();
     }
+    return count;
   }
 
   @Override
@@ -355,328 +271,14 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
   }
 
   // 
============================================================================
-  //  TODO: Metrics
-  // 
============================================================================
-  public long getPollCalls() {
-    return pollCalls;
-  }
-
-  public long getNullPollCalls() {
-    return nullPollCalls;
-  }
-
-  // 
============================================================================
-  //  Event Helpers
-  // 
============================================================================
-  /**
-   * Suspend the procedure if the event is not ready yet.
-   * @param event the event to wait on
-   * @param procedure the procedure waiting on the event
-   * @return true if the procedure has to wait for the event to be ready, 
false otherwise.
-   */
-  public boolean waitEvent(final ProcedureEvent event, final Procedure 
procedure) {
-    return waitEvent(event, procedure, false);
-  }
-
-  /**
-   * Suspend the procedure if the event is not ready yet.
-   * @param event the event to wait on
-   * @param procedure the procedure waiting on the event
-   * @param suspendQueue true if the entire queue of the procedure should be 
suspended
-   * @return true if the procedure has to wait for the event to be ready, 
false otherwise.
-   */
-  public boolean waitEvent(final ProcedureEvent event, final Procedure 
procedure,
-      final boolean suspendQueue) {
-    return waitEvent(event, /* lockEvent= */false, procedure, suspendQueue);
-  }
-
-  private boolean waitEvent(final ProcedureEvent event, final boolean 
lockEvent,
-      final Procedure procedure, final boolean suspendQueue) {
-    synchronized (event) {
-      if (event.isReady()) {
-        if (lockEvent) {
-          event.setReady(false);
-        }
-        return false;
-      }
-
-      if (!suspendQueue) {
-        suspendProcedure(event, procedure);
-      } else if (isTableProcedure(procedure)) {
-        waitTableEvent(event, procedure);
-      } else if (isServerProcedure(procedure)) {
-        waitServerEvent(event, procedure);
-      } else {
-        // TODO: at the moment we only have Table and Server procedures
-        // if you are implementing a non-table/non-server procedure, you have 
two options: create
-        // a group for all the non-table/non-server procedures or try to find 
a key for your
-        // non-table/non-server procedures and implement something similar to 
the TableRunQueue.
-        throw new UnsupportedOperationException(
-          "RQs for non-table/non-server procedures are not implemented yet: " 
+ procedure);
-      }
-    }
-    return true;
-  }
-
-  private void waitTableEvent(final ProcedureEvent event, final Procedure 
procedure) {
-    final TableName tableName = getTableName(procedure);
-    final boolean isDebugEnabled = LOG.isDebugEnabled();
-
-    schedLock.lock();
-    try {
-      TableQueue queue = getTableQueue(tableName);
-      queue.addFront(procedure);
-      if (queue.isSuspended()) return;
-
-      if (isDebugEnabled) {
-        LOG.debug("Suspend table queue " + tableName);
-      }
-      queue.setSuspended(true);
-      removeFromRunQueue(tableRunQueue, queue);
-      event.suspendTableQueue(queue);
-    } finally {
-      schedLock.unlock();
-    }
-  }
-
-  private void waitServerEvent(final ProcedureEvent event, final Procedure 
procedure) {
-    final ServerName serverName = getServerName(procedure);
-    final boolean isDebugEnabled = LOG.isDebugEnabled();
-
-    schedLock.lock();
-    try {
-      // TODO: This will change once we have the new AM
-      ServerQueue queue = getServerQueue(serverName);
-      queue.addFront(procedure);
-      if (queue.isSuspended()) return;
-
-      if (isDebugEnabled) {
-        LOG.debug("Suspend server queue " + serverName);
-      }
-      queue.setSuspended(true);
-      removeFromRunQueue(serverRunQueue, queue);
-      event.suspendServerQueue(queue);
-    } finally {
-      schedLock.unlock();
-    }
-  }
-
-  /**
-   * Mark the event has not ready.
-   * procedures calling waitEvent() will be suspended.
-   * @param event the event to mark as suspended/not ready
-   */
-  public void suspendEvent(final ProcedureEvent event) {
-    final boolean isTraceEnabled = LOG.isTraceEnabled();
-    synchronized (event) {
-      event.setReady(false);
-      if (isTraceEnabled) {
-        LOG.trace("Suspend event " + event);
-      }
-    }
-  }
-
-  /**
-   * Wake every procedure waiting for the specified event
-   * (By design each event has only one "wake" caller)
-   * @param event the event to wait
-   */
-  public void wakeEvent(final ProcedureEvent event) {
-    final boolean isTraceEnabled = LOG.isTraceEnabled();
-    synchronized (event) {
-      event.setReady(true);
-      if (isTraceEnabled) {
-        LOG.trace("Wake event " + event);
-      }
-
-      schedLock.lock();
-      try {
-        final int waitingCount = popEventWaitingObjects(event);
-        wakePollIfNeeded(waitingCount);
-      } finally {
-        schedLock.unlock();
-      }
-    }
-  }
-
-  /**
-   * Wake every procedure waiting for the specified events.
-   * (By design each event has only one "wake" caller)
-   * @param events the list of events to wake
-   * @param count the number of events in the array to wake
-   */
-  public void wakeEvents(final ProcedureEvent[] events, final int count) {
-    final boolean isTraceEnabled = LOG.isTraceEnabled();
-    schedLock.lock();
-    try {
-      int waitingCount = 0;
-      for (int i = 0; i < count; ++i) {
-        final ProcedureEvent event = events[i];
-        synchronized (event) {
-          event.setReady(true);
-          if (isTraceEnabled) {
-            LOG.trace("Wake event " + event);
-          }
-          waitingCount += popEventWaitingObjects(event);
-        }
-      }
-      wakePollIfNeeded(waitingCount);
-    } finally {
-      schedLock.unlock();
-    }
-  }
-
-  private int popEventWaitingObjects(final ProcedureEvent event) {
-    int count = 0;
-    while (event.hasWaitingTables()) {
-      final Queue<TableName> queue = event.popWaitingTable();
-      queue.setSuspended(false);
-      addToRunQueue(tableRunQueue, queue);
-      count += queue.size();
-    }
-    // TODO: This will change once we have the new AM
-    while (event.hasWaitingServers()) {
-      final Queue<ServerName> queue = event.popWaitingServer();
-      queue.setSuspended(false);
-      addToRunQueue(serverRunQueue, queue);
-      count += queue.size();
-    }
-
-    while (event.hasWaitingProcedures()) {
-      wakeProcedure(event.popWaitingProcedure(false));
-      count++;
-    }
-    return count;
-  }
-
-  private void suspendProcedure(final BaseProcedureEvent event, final 
Procedure procedure) {
-    procedure.suspend();
-    event.suspendProcedure(procedure);
-  }
-
-  private void wakeProcedure(final Procedure procedure) {
-    procedure.resume();
-    doAdd(procedure, /* addFront= */ true, /* notify= */false);
-  }
-
-  private static abstract class BaseProcedureEvent {
-    private ArrayDeque<Procedure> waitingProcedures = null;
-
-    protected void suspendProcedure(final Procedure proc) {
-      if (waitingProcedures == null) {
-        waitingProcedures = new ArrayDeque<Procedure>();
-      }
-      waitingProcedures.addLast(proc);
-    }
-
-    protected boolean hasWaitingProcedures() {
-      return waitingProcedures != null;
-    }
-
-    protected Procedure popWaitingProcedure(final boolean popFront) {
-      // it will be nice to use IterableList on a procedure and avoid 
allocations...
-      Procedure proc = popFront ? waitingProcedures.removeFirst() : 
waitingProcedures.removeLast();
-      if (waitingProcedures.isEmpty()) {
-        waitingProcedures = null;
-      }
-      return proc;
-    }
-
-    @VisibleForTesting
-    protected synchronized int size() {
-      if (waitingProcedures != null) {
-        return waitingProcedures.size();
-      }
-      return 0;
-    }
-  }
-
-  public static class ProcedureEvent extends BaseProcedureEvent {
-    private final String description;
-
-    private Queue<ServerName> waitingServers = null;
-    private Queue<TableName> waitingTables = null;
-    private boolean ready = false;
-
-    protected ProcedureEvent() {
-      this(null);
-    }
-
-    public ProcedureEvent(final String description) {
-      this.description = description;
-    }
-
-    public synchronized boolean isReady() {
-      return ready;
-    }
-
-    private synchronized void setReady(boolean isReady) {
-      this.ready = isReady;
-    }
-
-    private void suspendTableQueue(Queue<TableName> queue) {
-      waitingTables = AvlIterableList.append(waitingTables, queue);
-    }
-
-    private void suspendServerQueue(Queue<ServerName> queue) {
-      waitingServers = AvlIterableList.append(waitingServers, queue);
-    }
-
-    private boolean hasWaitingTables() {
-      return waitingTables != null;
-    }
-
-    private Queue<TableName> popWaitingTable() {
-      Queue<TableName> node = waitingTables;
-      waitingTables = AvlIterableList.remove(waitingTables, node);
-      return node;
-    }
-
-    private boolean hasWaitingServers() {
-      return waitingServers != null;
-    }
-
-    private Queue<ServerName> popWaitingServer() {
-      Queue<ServerName> node = waitingServers;
-      waitingServers = AvlIterableList.remove(waitingServers, node);
-      return node;
-    }
-
-    protected String getDescription() {
-      if (description == null) {
-        // you should override this method if you are using the default 
constructor
-        throw new UnsupportedOperationException();
-      }
-      return description;
-    }
-
-    @VisibleForTesting
-    protected synchronized int size() {
-      int count = super.size();
-      if (waitingTables != null) {
-        count += waitingTables.size();
-      }
-      if (waitingServers != null) {
-        count += waitingServers.size();
-      }
-      return count;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("%s(%s)", getClass().getSimpleName(), 
getDescription());
-    }
-  }
-
-  // 
============================================================================
   //  Table Queue Lookup Helpers
   // 
============================================================================
   private TableQueue getTableQueueWithLock(TableName tableName) {
-    schedLock.lock();
+    schedLock();
     try {
       return getTableQueue(tableName);
     } finally {
-      schedLock.unlock();
+      schedUnlock();
     }
   }
 
@@ -727,11 +329,11 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
   //  Server Queue Lookup Helpers
   // 
============================================================================
   private ServerQueue getServerQueueWithLock(ServerName serverName) {
-    schedLock.lock();
+    schedLock();
     try {
       return getServerQueue(serverName);
     } finally {
-      schedLock.unlock();
+      schedUnlock();
     }
   }
 
@@ -790,7 +392,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
     }
   }
 
-  private static class RegionEvent extends BaseProcedureEvent {
+  private static class RegionEvent extends ProcedureEventQueue {
     private final HRegionInfo regionInfo;
     private long exclusiveLockProcIdOwner = Long.MIN_VALUE;
 
@@ -823,7 +425,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
 
     @Override
     public String toString() {
-      return String.format("region %s event", 
regionInfo.getRegionNameAsString());
+      return "RegionEvent(" + regionInfo.getRegionNameAsString() + ")";
     }
   }
 
@@ -1046,33 +648,33 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
    * @return true if we were able to acquire the lock on the table, otherwise 
false.
    */
   public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final 
TableName table) {
-    schedLock.lock();
+    schedLock();
     TableQueue queue = getTableQueue(table);
     if (!queue.getNamespaceQueue().trySharedLock()) {
-      schedLock.unlock();
+      schedUnlock();
       return false;
     }
 
     if (!queue.tryExclusiveLock(procedure)) {
       queue.getNamespaceQueue().releaseSharedLock();
-      schedLock.unlock();
+      schedUnlock();
       return false;
     }
 
     removeFromRunQueue(tableRunQueue, queue);
     boolean hasParentLock = queue.hasParentLock(procedure);
-    schedLock.unlock();
+    schedUnlock();
 
     boolean hasXLock = true;
     if (!hasParentLock) {
       // Zk lock is expensive...
       hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
       if (!hasXLock) {
-        schedLock.lock();
+        schedLock();
         if (!hasParentLock) queue.releaseExclusiveLock();
         queue.getNamespaceQueue().releaseSharedLock();
         addToRunQueue(tableRunQueue, queue);
-        schedLock.unlock();
+        schedUnlock();
       }
     }
     return hasXLock;
@@ -1092,11 +694,11 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
       queue.releaseZkExclusiveLock(lockManager);
     }
 
-    schedLock.lock();
+    schedLock();
     if (!hasParentLock) queue.releaseExclusiveLock();
     queue.getNamespaceQueue().releaseSharedLock();
     addToRunQueue(tableRunQueue, queue);
-    schedLock.unlock();
+    schedUnlock();
   }
 
   /**
@@ -1112,7 +714,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
 
   private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure,
       final TableName table) {
-    schedLock.lock();
+    schedLock();
     TableQueue queue = getTableQueue(table);
     if (!queue.getNamespaceQueue().trySharedLock()) {
       return null;
@@ -1120,7 +722,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
 
     if (!queue.trySharedLock()) {
       queue.getNamespaceQueue().releaseSharedLock();
-      schedLock.unlock();
+      schedUnlock();
       return null;
     }
 
@@ -1129,11 +731,11 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
     if (!queue.tryZkSharedLock(lockManager, procedure.toString())) {
       queue.releaseSharedLock();
       queue.getNamespaceQueue().releaseSharedLock();
-      schedLock.unlock();
+      schedUnlock();
       return null;
     }
 
-    schedLock.unlock();
+    schedUnlock();
 
     return queue;
   }
@@ -1146,7 +748,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
   public void releaseTableSharedLock(final Procedure procedure, final 
TableName table) {
     final TableQueue queue = getTableQueueWithLock(table);
 
-    schedLock.lock();
+    schedLock();
     // Zk lock is expensive...
     queue.releaseZkSharedLock(lockManager);
 
@@ -1154,7 +756,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
     if (queue.releaseSharedLock()) {
       addToRunQueue(tableRunQueue, queue);
     }
-    schedLock.unlock();
+    schedUnlock();
   }
 
   /**
@@ -1168,8 +770,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
    */
   @VisibleForTesting
   protected boolean markTableAsDeleted(final TableName table, final Procedure 
procedure) {
-    final ReentrantLock l = schedLock;
-    l.lock();
+    schedLock();
     try {
       TableQueue queue = getTableQueue(table);
       if (queue == null) return true;
@@ -1193,7 +794,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
         return false;
       }
     } finally {
-      l.unlock();
+      schedUnlock();
     }
     return true;
   }
@@ -1298,7 +899,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
     }
 
     // awake procedures if any
-    schedLock.lock();
+    schedLock();
     try {
       for (int i = numProcs - 1; i >= 0; --i) {
         wakeProcedure(nextProcs[i]);
@@ -1312,7 +913,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
         releaseTableSharedLock(procedure, table);
       }
     } finally {
-      schedLock.unlock();
+      schedUnlock();
     }
   }
 
@@ -1327,7 +928,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
    * @return true if we were able to acquire the lock on the namespace, 
otherwise false.
    */
   public boolean tryAcquireNamespaceExclusiveLock(final Procedure procedure, 
final String nsName) {
-    schedLock.lock();
+    schedLock();
     try {
       TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
       if (!tableQueue.trySharedLock()) return false;
@@ -1339,7 +940,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
       }
       return hasLock;
     } finally {
-      schedLock.unlock();
+      schedUnlock();
     }
   }
 
@@ -1350,7 +951,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
    * @param nsName the namespace that has the exclusive lock
    */
   public void releaseNamespaceExclusiveLock(final Procedure procedure, final 
String nsName) {
-    schedLock.lock();
+    schedLock();
     try {
       final TableQueue tableQueue = 
getTableQueue(TableName.NAMESPACE_TABLE_NAME);
       final NamespaceQueue queue = getNamespaceQueue(nsName);
@@ -1360,7 +961,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
         addToRunQueue(tableRunQueue, tableQueue);
       }
     } finally {
-      schedLock.unlock();
+      schedUnlock();
     }
   }
 
@@ -1376,7 +977,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
    */
   public boolean tryAcquireServerExclusiveLock(final Procedure procedure,
       final ServerName serverName) {
-    schedLock.lock();
+    schedLock();
     try {
       ServerQueue queue = getServerQueue(serverName);
       if (queue.tryExclusiveLock(procedure)) {
@@ -1384,7 +985,7 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
         return true;
       }
     } finally {
-      schedLock.unlock();
+      schedUnlock();
     }
     return false;
   }
@@ -1397,13 +998,13 @@ public class MasterProcedureScheduler implements 
ProcedureRunnableSet {
    */
   public void releaseServerExclusiveLock(final Procedure procedure,
       final ServerName serverName) {
-    schedLock.lock();
+    schedLock();
     try {
       ServerQueue queue = getServerQueue(serverName);
       queue.releaseExclusiveLock();
       addToRunQueue(serverRunQueue, queue);
     } finally {
-      schedLock.unlock();
+      schedUnlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
index 4c53845..1b434fe 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
@@ -18,11 +18,6 @@
 
 package org.apache.hadoop.hbase.master.procedure;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -34,17 +29,15 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
-import 
org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Threads;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -200,85 +193,4 @@ public class TestMasterProcedureEvents {
     }
     return null;
   }
-
-  @Test(timeout=30000)
-  public void testTimeoutEventProcedure() throws Exception {
-    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
-    ProcedureExecutor<MasterProcedureEnv> procExec = 
master.getMasterProcedureExecutor();
-    MasterProcedureScheduler procSched = 
procExec.getEnvironment().getProcedureQueue();
-
-    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 5);
-    procExec.submitProcedure(proc);
-
-    ProcedureTestingUtility.waitProcedure(procExec, proc.getProcId());
-    
ProcedureTestingUtility.assertIsAbortException(procExec.getResult(proc.getProcId()));
-  }
-
-  public static class TestTimeoutEventProcedure
-      extends Procedure<MasterProcedureEnv> implements TableProcedureInterface 
{
-    private final ProcedureEvent event = new ProcedureEvent("timeout-event");
-
-    private final AtomicInteger ntimeouts = new AtomicInteger(0);
-    private int maxTimeouts = 1;
-
-    public TestTimeoutEventProcedure() {}
-
-    public TestTimeoutEventProcedure(final int timeoutMsec, final int 
maxTimeouts) {
-      this.maxTimeouts = maxTimeouts;
-      setTimeout(timeoutMsec);
-      setOwner("test");
-    }
-
-    @Override
-    protected Procedure[] execute(final MasterProcedureEnv env)
-        throws ProcedureSuspendedException {
-      LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts);
-      if (ntimeouts.get() > maxTimeouts) {
-        setAbortFailure("test", "give up after " + ntimeouts.get());
-        return null;
-      }
-
-      env.getProcedureQueue().suspendEvent(event);
-      if (env.getProcedureQueue().waitEvent(event, this)) {
-        setState(ProcedureState.WAITING_TIMEOUT);
-        throw new ProcedureSuspendedException();
-      }
-
-      return null;
-    }
-
-    @Override
-    protected void rollback(final MasterProcedureEnv env) {
-    }
-
-    @Override
-    protected boolean setTimeoutFailure(final MasterProcedureEnv env) {
-      int n = ntimeouts.incrementAndGet();
-      LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
-      setState(ProcedureState.RUNNABLE);
-      env.getProcedureQueue().wakeEvent(event);
-      return false;
-    }
-
-    @Override
-    public TableName getTableName() {
-      return TableName.valueOf("testtb");
-    }
-
-    @Override
-    public TableOperationType getTableOperationType() {
-      return TableOperationType.READ;
-    }
-
-    @Override
-    protected boolean abort(MasterProcedureEnv env) { return false; }
-
-    @Override
-    protected void serializeStateData(final OutputStream stream) throws 
IOException {
-    }
-
-    @Override
-    protected void deserializeStateData(final InputStream stream) throws 
IOException {
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
index 12f8263..bcacb48 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.master.TableLockManager;
-import 
org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
 import 
org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -541,40 +541,6 @@ public class TestMasterProcedureScheduler {
   }
 
   @Test
-  public void testSuspendedTableQueue() throws Exception {
-    final TableName tableName = TableName.valueOf("testSuspendedQueue");
-
-    queue.addBack(new TestTableProcedure(1, tableName,
-        TableProcedureInterface.TableOperationType.EDIT));
-    queue.addBack(new TestTableProcedure(2, tableName,
-        TableProcedureInterface.TableOperationType.EDIT));
-
-    Procedure proc = queue.poll();
-    assertEquals(1, proc.getProcId());
-    assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
-
-    // Suspend
-    // TODO: If we want to keep the zk-lock we need to retain the lock on 
suspend
-    ProcedureEvent event = new ProcedureEvent("testSuspendedTableQueueEvent");
-    assertEquals(true, queue.waitEvent(event, proc, true));
-    queue.releaseTableExclusiveLock(proc, tableName);
-    assertEquals(null, queue.poll(0));
-
-    // Resume
-    queue.wakeEvent(event);
-
-    proc = queue.poll();
-    assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
-    assertEquals(1, proc.getProcId());
-    queue.releaseTableExclusiveLock(proc, tableName);
-
-    proc = queue.poll();
-    assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
-    assertEquals(2, proc.getProcId());
-    queue.releaseTableExclusiveLock(proc, tableName);
-  }
-
-  @Test
   public void testSuspendedProcedure() throws Exception {
     final TableName tableName = TableName.valueOf("testSuspendedProcedure");
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
index 380067d..d6ddd13 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
@@ -23,23 +23,18 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.master.TableLockManager;
-import 
org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
 import 
org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
-import 
org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedureWithEvent;
 import org.apache.hadoop.hbase.procedure2.Procedure;
-import 
org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.util.Threads;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -212,92 +207,6 @@ public class TestMasterProcedureSchedulerConcurrency {
     }
   }
 
-  @Test(timeout=60000)
-  public void testConcurrentWaitWake() throws Exception {
-    testConcurrentWaitWake(false);
-  }
-
-  @Test(timeout=60000)
-  public void testConcurrentWaitWakeBatch() throws Exception {
-    testConcurrentWaitWake(true);
-  }
-
-  private void testConcurrentWaitWake(final boolean useWakeBatch) throws 
Exception {
-    final TableName tableName = TableName.valueOf("testtb");
-
-    final int NPROCS = 20;
-    final int NRUNS = 100;
-
-    for (long i = 0; i < NPROCS; ++i) {
-      queue.addBack(new TestTableProcedureWithEvent(i, tableName,
-          TableProcedureInterface.TableOperationType.READ));
-    }
-
-    final Thread[] threads = new Thread[4];
-    final AtomicInteger waitCount = new AtomicInteger(0);
-    final AtomicInteger wakeCount = new AtomicInteger(0);
-
-    final ConcurrentSkipListSet<TestTableProcedureWithEvent> waitQueue =
-      new ConcurrentSkipListSet<TestTableProcedureWithEvent>();
-    threads[0] = new Thread() {
-      @Override
-      public void run() {
-        while (true) {
-          if (useWakeBatch) {
-            ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
-            for (int i = 0; i < ev.length; ++i) {
-              ev[i] = waitQueue.pollFirst().getEvent();
-              LOG.debug("WAKE " + ev[i] + " total=" + wakeCount.get());
-            }
-            queue.wakeEvents(ev, ev.length);
-            wakeCount.addAndGet(ev.length);
-          } else {
-            int size = waitQueue.size();
-            while (size-- > 0) {
-              ProcedureEvent ev = waitQueue.pollFirst().getEvent();
-              queue.wakeEvent(ev);
-              LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
-              wakeCount.incrementAndGet();
-            }
-          }
-          if (wakeCount.get() >= NRUNS) {
-            break;
-          }
-          Threads.sleepWithoutInterrupt(25);
-        }
-      }
-    };
-
-    for (int i = 1; i < threads.length; ++i) {
-      threads[i] = new Thread() {
-        @Override
-        public void run() {
-          while (true) {
-            TestTableProcedureWithEvent proc = 
(TestTableProcedureWithEvent)queue.poll();
-            if (proc == null) continue;
-
-            waitQueue.add(proc);
-            queue.suspendEvent(proc.getEvent());
-            queue.waitEvent(proc.getEvent(), proc);
-            LOG.debug("WAIT " + proc.getEvent());
-            if (waitCount.incrementAndGet() >= NRUNS) {
-              break;
-            }
-          }
-        }
-      };
-    }
-
-    for (int i = 0; i < threads.length; ++i) {
-      threads[i].start();
-    }
-    for (int i = 0; i < threads.length; ++i) {
-      threads[i].join();
-    }
-
-    queue.clear();
-  }
-
   public static class TestTableProcSet {
     private final MasterProcedureScheduler queue;
 

Reply via email to