Repository: hbase Updated Branches: refs/heads/master dfb2a800c -> 92ef23448
http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index aba82c1..cc9efd2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -24,8 +24,6 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; import java.util.HashMap; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,8 +38,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; +import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; +import org.apache.hadoop.hbase.procedure2.ProcedureEventQueue; import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode; @@ -49,8 +48,8 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; /** - * ProcedureRunnableSet for the Master Procedures. - * This RunnableSet tries to provide to the ProcedureExecutor procedures + * ProcedureScheduler for the Master Procedures. + * This ProcedureScheduler tries to provide to the ProcedureExecutor procedures * that can be executed without having to wait on a lock. * Most of the master operations can be executed concurrently, if they * are operating on different tables (e.g. two create table can be performed @@ -65,12 +64,10 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class MasterProcedureScheduler implements ProcedureRunnableSet { +public class MasterProcedureScheduler extends AbstractProcedureScheduler { private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class); private final TableLockManager lockManager; - private final ReentrantLock schedLock = new ReentrantLock(); - private final Condition schedWaitCond = schedLock.newCondition(); private final static NamespaceQueueKeyComparator NAMESPACE_QUEUE_KEY_COMPARATOR = new NamespaceQueueKeyComparator(); @@ -90,10 +87,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private final int userTablePriority; private final int sysTablePriority; - // TODO: metrics - private long pollCalls = 0; - private long nullPollCalls = 0; - public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) { this.lockManager = lockManager; @@ -104,44 +97,23 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } @Override - public void addFront(Procedure proc) { - doAdd(proc, true); - } - - @Override - public void addBack(Procedure proc) { - doAdd(proc, false); - } - - @Override public void yield(final Procedure proc) { - doAdd(proc, isTableProcedure(proc)); - } - - private void doAdd(final Procedure proc, final boolean addFront) { - doAdd(proc, addFront, true); + push(proc, isTableProcedure(proc), true); } - private void doAdd(final Procedure proc, final boolean addFront, final boolean notify) { - schedLock.lock(); - try { - if (isTableProcedure(proc)) { - doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); - } else if (isServerProcedure(proc)) { - doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); - } else { - // TODO: at the moment we only have Table and Server procedures - // if you are implementing a non-table/non-server procedure, you have two options: create - // a group for all the non-table/non-server procedures or try to find a key for your - // non-table/non-server procedures and implement something similar to the TableRunQueue. - throw new UnsupportedOperationException( - "RQs for non-table/non-server procedures are not implemented yet: " + proc); - } - if (notify) { - schedWaitCond.signal(); - } - } finally { - schedLock.unlock(); + @Override + protected void enqueue(final Procedure proc, final boolean addFront) { + if (isTableProcedure(proc)) { + doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); + } else if (isServerProcedure(proc)) { + doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); + } else { + // TODO: at the moment we only have Table and Server procedures + // if you are implementing a non-table/non-server procedure, you have two options: create + // a group for all the non-table/non-server procedures or try to find a key for your + // non-table/non-server procedures and implement something similar to the TableRunQueue. + throw new UnsupportedOperationException( + "RQs for non-table/non-server procedures are not implemented yet: " + proc); } } @@ -165,49 +137,22 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } @Override - public Procedure poll() { - return poll(-1); + protected boolean queueHasRunnables() { + return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables(); } - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - protected Procedure poll(long waitNsec) { - Procedure pollResult = null; - schedLock.lock(); - try { - if (!hasRunnables()) { - if (waitNsec < 0) { - schedWaitCond.await(); - } else { - schedWaitCond.awaitNanos(waitNsec); - } - if (!hasRunnables()) { - return null; - } - } - - // For now, let server handling have precedence over table handling; presumption is that it - // is more important handling crashed servers than it is running the - // enabling/disabling tables, etc. - pollResult = doPoll(serverRunQueue); - if (pollResult == null) { - pollResult = doPoll(tableRunQueue); - } - - // update metrics - pollCalls++; - nullPollCalls += (pollResult == null) ? 1 : 0; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - schedLock.unlock(); + @Override + protected Procedure dequeue() { + // For now, let server handling have precedence over table handling; presumption is that it + // is more important handling crashed servers than it is running the + // enabling/disabling tables, etc. + Procedure pollResult = doPoll(serverRunQueue); + if (pollResult == null) { + pollResult = doPoll(tableRunQueue); } return pollResult; } - private boolean hasRunnables() { - return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables(); - } - private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) { final Queue<T> rq = fairq.poll(); if (rq == null || !rq.isAvailable()) { @@ -239,24 +184,18 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } @Override - public void clear() { - // NOTE: USED ONLY FOR TESTING - schedLock.lock(); - try { - // Remove Servers - for (int i = 0; i < serverBuckets.length; ++i) { - clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); - serverBuckets[i] = null; - } + public void clearQueue() { + // Remove Servers + for (int i = 0; i < serverBuckets.length; ++i) { + clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); + serverBuckets[i] = null; + } - // Remove Tables - clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); - tableMap = null; + // Remove Tables + clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); + tableMap = null; - assert size() == 0 : "expected queue size to be 0, got " + size(); - } finally { - schedLock.unlock(); - } + assert size() == 0 : "expected queue size to be 0, got " + size(); } private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap, @@ -269,48 +208,25 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - private void wakePollIfNeeded(final int waitingCount) { - if (waitingCount > 1) { - schedWaitCond.signalAll(); - } else if (waitingCount > 0) { - schedWaitCond.signal(); - } - } - @Override - public void signalAll() { - schedLock.lock(); - try { - schedWaitCond.signalAll(); - } finally { - schedLock.unlock(); - } - } + public int queueSize() { + int count = 0; - @Override - public int size() { - schedLock.lock(); - try { - int count = 0; - - // Server queues - final AvlTreeIterator<ServerQueue> serverIter = new AvlTreeIterator<ServerQueue>(); - for (int i = 0; i < serverBuckets.length; ++i) { - serverIter.seekFirst(serverBuckets[i]); - while (serverIter.hasNext()) { - count += serverIter.next().size(); - } + // Server queues + final AvlTreeIterator<ServerQueue> serverIter = new AvlTreeIterator<ServerQueue>(); + for (int i = 0; i < serverBuckets.length; ++i) { + serverIter.seekFirst(serverBuckets[i]); + while (serverIter.hasNext()) { + count += serverIter.next().size(); } + } - // Table queues - final AvlTreeIterator<TableQueue> tableIter = new AvlTreeIterator<TableQueue>(tableMap); - while (tableIter.hasNext()) { - count += tableIter.next().size(); - } - return count; - } finally { - schedLock.unlock(); + // Table queues + final AvlTreeIterator<TableQueue> tableIter = new AvlTreeIterator<TableQueue>(tableMap); + while (tableIter.hasNext()) { + count += tableIter.next().size(); } + return count; } @Override @@ -355,328 +271,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } // ============================================================================ - // TODO: Metrics - // ============================================================================ - public long getPollCalls() { - return pollCalls; - } - - public long getNullPollCalls() { - return nullPollCalls; - } - - // ============================================================================ - // Event Helpers - // ============================================================================ - /** - * Suspend the procedure if the event is not ready yet. - * @param event the event to wait on - * @param procedure the procedure waiting on the event - * @return true if the procedure has to wait for the event to be ready, false otherwise. - */ - public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) { - return waitEvent(event, procedure, false); - } - - /** - * Suspend the procedure if the event is not ready yet. - * @param event the event to wait on - * @param procedure the procedure waiting on the event - * @param suspendQueue true if the entire queue of the procedure should be suspended - * @return true if the procedure has to wait for the event to be ready, false otherwise. - */ - public boolean waitEvent(final ProcedureEvent event, final Procedure procedure, - final boolean suspendQueue) { - return waitEvent(event, /* lockEvent= */false, procedure, suspendQueue); - } - - private boolean waitEvent(final ProcedureEvent event, final boolean lockEvent, - final Procedure procedure, final boolean suspendQueue) { - synchronized (event) { - if (event.isReady()) { - if (lockEvent) { - event.setReady(false); - } - return false; - } - - if (!suspendQueue) { - suspendProcedure(event, procedure); - } else if (isTableProcedure(procedure)) { - waitTableEvent(event, procedure); - } else if (isServerProcedure(procedure)) { - waitServerEvent(event, procedure); - } else { - // TODO: at the moment we only have Table and Server procedures - // if you are implementing a non-table/non-server procedure, you have two options: create - // a group for all the non-table/non-server procedures or try to find a key for your - // non-table/non-server procedures and implement something similar to the TableRunQueue. - throw new UnsupportedOperationException( - "RQs for non-table/non-server procedures are not implemented yet: " + procedure); - } - } - return true; - } - - private void waitTableEvent(final ProcedureEvent event, final Procedure procedure) { - final TableName tableName = getTableName(procedure); - final boolean isDebugEnabled = LOG.isDebugEnabled(); - - schedLock.lock(); - try { - TableQueue queue = getTableQueue(tableName); - queue.addFront(procedure); - if (queue.isSuspended()) return; - - if (isDebugEnabled) { - LOG.debug("Suspend table queue " + tableName); - } - queue.setSuspended(true); - removeFromRunQueue(tableRunQueue, queue); - event.suspendTableQueue(queue); - } finally { - schedLock.unlock(); - } - } - - private void waitServerEvent(final ProcedureEvent event, final Procedure procedure) { - final ServerName serverName = getServerName(procedure); - final boolean isDebugEnabled = LOG.isDebugEnabled(); - - schedLock.lock(); - try { - // TODO: This will change once we have the new AM - ServerQueue queue = getServerQueue(serverName); - queue.addFront(procedure); - if (queue.isSuspended()) return; - - if (isDebugEnabled) { - LOG.debug("Suspend server queue " + serverName); - } - queue.setSuspended(true); - removeFromRunQueue(serverRunQueue, queue); - event.suspendServerQueue(queue); - } finally { - schedLock.unlock(); - } - } - - /** - * Mark the event has not ready. - * procedures calling waitEvent() will be suspended. - * @param event the event to mark as suspended/not ready - */ - public void suspendEvent(final ProcedureEvent event) { - final boolean isTraceEnabled = LOG.isTraceEnabled(); - synchronized (event) { - event.setReady(false); - if (isTraceEnabled) { - LOG.trace("Suspend event " + event); - } - } - } - - /** - * Wake every procedure waiting for the specified event - * (By design each event has only one "wake" caller) - * @param event the event to wait - */ - public void wakeEvent(final ProcedureEvent event) { - final boolean isTraceEnabled = LOG.isTraceEnabled(); - synchronized (event) { - event.setReady(true); - if (isTraceEnabled) { - LOG.trace("Wake event " + event); - } - - schedLock.lock(); - try { - final int waitingCount = popEventWaitingObjects(event); - wakePollIfNeeded(waitingCount); - } finally { - schedLock.unlock(); - } - } - } - - /** - * Wake every procedure waiting for the specified events. - * (By design each event has only one "wake" caller) - * @param events the list of events to wake - * @param count the number of events in the array to wake - */ - public void wakeEvents(final ProcedureEvent[] events, final int count) { - final boolean isTraceEnabled = LOG.isTraceEnabled(); - schedLock.lock(); - try { - int waitingCount = 0; - for (int i = 0; i < count; ++i) { - final ProcedureEvent event = events[i]; - synchronized (event) { - event.setReady(true); - if (isTraceEnabled) { - LOG.trace("Wake event " + event); - } - waitingCount += popEventWaitingObjects(event); - } - } - wakePollIfNeeded(waitingCount); - } finally { - schedLock.unlock(); - } - } - - private int popEventWaitingObjects(final ProcedureEvent event) { - int count = 0; - while (event.hasWaitingTables()) { - final Queue<TableName> queue = event.popWaitingTable(); - queue.setSuspended(false); - addToRunQueue(tableRunQueue, queue); - count += queue.size(); - } - // TODO: This will change once we have the new AM - while (event.hasWaitingServers()) { - final Queue<ServerName> queue = event.popWaitingServer(); - queue.setSuspended(false); - addToRunQueue(serverRunQueue, queue); - count += queue.size(); - } - - while (event.hasWaitingProcedures()) { - wakeProcedure(event.popWaitingProcedure(false)); - count++; - } - return count; - } - - private void suspendProcedure(final BaseProcedureEvent event, final Procedure procedure) { - procedure.suspend(); - event.suspendProcedure(procedure); - } - - private void wakeProcedure(final Procedure procedure) { - procedure.resume(); - doAdd(procedure, /* addFront= */ true, /* notify= */false); - } - - private static abstract class BaseProcedureEvent { - private ArrayDeque<Procedure> waitingProcedures = null; - - protected void suspendProcedure(final Procedure proc) { - if (waitingProcedures == null) { - waitingProcedures = new ArrayDeque<Procedure>(); - } - waitingProcedures.addLast(proc); - } - - protected boolean hasWaitingProcedures() { - return waitingProcedures != null; - } - - protected Procedure popWaitingProcedure(final boolean popFront) { - // it will be nice to use IterableList on a procedure and avoid allocations... - Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast(); - if (waitingProcedures.isEmpty()) { - waitingProcedures = null; - } - return proc; - } - - @VisibleForTesting - protected synchronized int size() { - if (waitingProcedures != null) { - return waitingProcedures.size(); - } - return 0; - } - } - - public static class ProcedureEvent extends BaseProcedureEvent { - private final String description; - - private Queue<ServerName> waitingServers = null; - private Queue<TableName> waitingTables = null; - private boolean ready = false; - - protected ProcedureEvent() { - this(null); - } - - public ProcedureEvent(final String description) { - this.description = description; - } - - public synchronized boolean isReady() { - return ready; - } - - private synchronized void setReady(boolean isReady) { - this.ready = isReady; - } - - private void suspendTableQueue(Queue<TableName> queue) { - waitingTables = AvlIterableList.append(waitingTables, queue); - } - - private void suspendServerQueue(Queue<ServerName> queue) { - waitingServers = AvlIterableList.append(waitingServers, queue); - } - - private boolean hasWaitingTables() { - return waitingTables != null; - } - - private Queue<TableName> popWaitingTable() { - Queue<TableName> node = waitingTables; - waitingTables = AvlIterableList.remove(waitingTables, node); - return node; - } - - private boolean hasWaitingServers() { - return waitingServers != null; - } - - private Queue<ServerName> popWaitingServer() { - Queue<ServerName> node = waitingServers; - waitingServers = AvlIterableList.remove(waitingServers, node); - return node; - } - - protected String getDescription() { - if (description == null) { - // you should override this method if you are using the default constructor - throw new UnsupportedOperationException(); - } - return description; - } - - @VisibleForTesting - protected synchronized int size() { - int count = super.size(); - if (waitingTables != null) { - count += waitingTables.size(); - } - if (waitingServers != null) { - count += waitingServers.size(); - } - return count; - } - - @Override - public String toString() { - return String.format("%s(%s)", getClass().getSimpleName(), getDescription()); - } - } - - // ============================================================================ // Table Queue Lookup Helpers // ============================================================================ private TableQueue getTableQueueWithLock(TableName tableName) { - schedLock.lock(); + schedLock(); try { return getTableQueue(tableName); } finally { - schedLock.unlock(); + schedUnlock(); } } @@ -727,11 +329,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // Server Queue Lookup Helpers // ============================================================================ private ServerQueue getServerQueueWithLock(ServerName serverName) { - schedLock.lock(); + schedLock(); try { return getServerQueue(serverName); } finally { - schedLock.unlock(); + schedUnlock(); } } @@ -790,7 +392,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - private static class RegionEvent extends BaseProcedureEvent { + private static class RegionEvent extends ProcedureEventQueue { private final HRegionInfo regionInfo; private long exclusiveLockProcIdOwner = Long.MIN_VALUE; @@ -823,7 +425,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { @Override public String toString() { - return String.format("region %s event", regionInfo.getRegionNameAsString()); + return "RegionEvent(" + regionInfo.getRegionNameAsString() + ")"; } } @@ -1046,33 +648,33 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { * @return true if we were able to acquire the lock on the table, otherwise false. */ public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final TableName table) { - schedLock.lock(); + schedLock(); TableQueue queue = getTableQueue(table); if (!queue.getNamespaceQueue().trySharedLock()) { - schedLock.unlock(); + schedUnlock(); return false; } if (!queue.tryExclusiveLock(procedure)) { queue.getNamespaceQueue().releaseSharedLock(); - schedLock.unlock(); + schedUnlock(); return false; } removeFromRunQueue(tableRunQueue, queue); boolean hasParentLock = queue.hasParentLock(procedure); - schedLock.unlock(); + schedUnlock(); boolean hasXLock = true; if (!hasParentLock) { // Zk lock is expensive... hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString()); if (!hasXLock) { - schedLock.lock(); + schedLock(); if (!hasParentLock) queue.releaseExclusiveLock(); queue.getNamespaceQueue().releaseSharedLock(); addToRunQueue(tableRunQueue, queue); - schedLock.unlock(); + schedUnlock(); } } return hasXLock; @@ -1092,11 +694,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { queue.releaseZkExclusiveLock(lockManager); } - schedLock.lock(); + schedLock(); if (!hasParentLock) queue.releaseExclusiveLock(); queue.getNamespaceQueue().releaseSharedLock(); addToRunQueue(tableRunQueue, queue); - schedLock.unlock(); + schedUnlock(); } /** @@ -1112,7 +714,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure, final TableName table) { - schedLock.lock(); + schedLock(); TableQueue queue = getTableQueue(table); if (!queue.getNamespaceQueue().trySharedLock()) { return null; @@ -1120,7 +722,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (!queue.trySharedLock()) { queue.getNamespaceQueue().releaseSharedLock(); - schedLock.unlock(); + schedUnlock(); return null; } @@ -1129,11 +731,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (!queue.tryZkSharedLock(lockManager, procedure.toString())) { queue.releaseSharedLock(); queue.getNamespaceQueue().releaseSharedLock(); - schedLock.unlock(); + schedUnlock(); return null; } - schedLock.unlock(); + schedUnlock(); return queue; } @@ -1146,7 +748,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { public void releaseTableSharedLock(final Procedure procedure, final TableName table) { final TableQueue queue = getTableQueueWithLock(table); - schedLock.lock(); + schedLock(); // Zk lock is expensive... queue.releaseZkSharedLock(lockManager); @@ -1154,7 +756,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (queue.releaseSharedLock()) { addToRunQueue(tableRunQueue, queue); } - schedLock.unlock(); + schedUnlock(); } /** @@ -1168,8 +770,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { */ @VisibleForTesting protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) { - final ReentrantLock l = schedLock; - l.lock(); + schedLock(); try { TableQueue queue = getTableQueue(table); if (queue == null) return true; @@ -1193,7 +794,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return false; } } finally { - l.unlock(); + schedUnlock(); } return true; } @@ -1298,7 +899,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } // awake procedures if any - schedLock.lock(); + schedLock(); try { for (int i = numProcs - 1; i >= 0; --i) { wakeProcedure(nextProcs[i]); @@ -1312,7 +913,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { releaseTableSharedLock(procedure, table); } } finally { - schedLock.unlock(); + schedUnlock(); } } @@ -1327,7 +928,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { * @return true if we were able to acquire the lock on the namespace, otherwise false. */ public boolean tryAcquireNamespaceExclusiveLock(final Procedure procedure, final String nsName) { - schedLock.lock(); + schedLock(); try { TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); if (!tableQueue.trySharedLock()) return false; @@ -1339,7 +940,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } return hasLock; } finally { - schedLock.unlock(); + schedUnlock(); } } @@ -1350,7 +951,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { * @param nsName the namespace that has the exclusive lock */ public void releaseNamespaceExclusiveLock(final Procedure procedure, final String nsName) { - schedLock.lock(); + schedLock(); try { final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); final NamespaceQueue queue = getNamespaceQueue(nsName); @@ -1360,7 +961,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { addToRunQueue(tableRunQueue, tableQueue); } } finally { - schedLock.unlock(); + schedUnlock(); } } @@ -1376,7 +977,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { */ public boolean tryAcquireServerExclusiveLock(final Procedure procedure, final ServerName serverName) { - schedLock.lock(); + schedLock(); try { ServerQueue queue = getServerQueue(serverName); if (queue.tryExclusiveLock(procedure)) { @@ -1384,7 +985,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return true; } } finally { - schedLock.unlock(); + schedUnlock(); } return false; } @@ -1397,13 +998,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { */ public void releaseServerExclusiveLock(final Procedure procedure, final ServerName serverName) { - schedLock.lock(); + schedLock(); try { ServerQueue queue = getServerQueue(serverName); queue.releaseExclusiveLock(); addToRunQueue(serverRunQueue, queue); } finally { - schedLock.unlock(); + schedUnlock(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java index 4c53845..1b434fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -18,11 +18,6 @@ package org.apache.hadoop.hbase.master.procedure; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,17 +29,15 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; -import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Threads; + import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -200,85 +193,4 @@ public class TestMasterProcedureEvents { } return null; } - - @Test(timeout=30000) - public void testTimeoutEventProcedure() throws Exception { - HMaster master = UTIL.getMiniHBaseCluster().getMaster(); - ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); - MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue(); - - TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 5); - procExec.submitProcedure(proc); - - ProcedureTestingUtility.waitProcedure(procExec, proc.getProcId()); - ProcedureTestingUtility.assertIsAbortException(procExec.getResult(proc.getProcId())); - } - - public static class TestTimeoutEventProcedure - extends Procedure<MasterProcedureEnv> implements TableProcedureInterface { - private final ProcedureEvent event = new ProcedureEvent("timeout-event"); - - private final AtomicInteger ntimeouts = new AtomicInteger(0); - private int maxTimeouts = 1; - - public TestTimeoutEventProcedure() {} - - public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) { - this.maxTimeouts = maxTimeouts; - setTimeout(timeoutMsec); - setOwner("test"); - } - - @Override - protected Procedure[] execute(final MasterProcedureEnv env) - throws ProcedureSuspendedException { - LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts); - if (ntimeouts.get() > maxTimeouts) { - setAbortFailure("test", "give up after " + ntimeouts.get()); - return null; - } - - env.getProcedureQueue().suspendEvent(event); - if (env.getProcedureQueue().waitEvent(event, this)) { - setState(ProcedureState.WAITING_TIMEOUT); - throw new ProcedureSuspendedException(); - } - - return null; - } - - @Override - protected void rollback(final MasterProcedureEnv env) { - } - - @Override - protected boolean setTimeoutFailure(final MasterProcedureEnv env) { - int n = ntimeouts.incrementAndGet(); - LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n); - setState(ProcedureState.RUNNABLE); - env.getProcedureQueue().wakeEvent(event); - return false; - } - - @Override - public TableName getTableName() { - return TableName.valueOf("testtb"); - } - - @Override - public TableOperationType getTableOperationType() { - return TableOperationType.READ; - } - - @Override - protected boolean abort(MasterProcedureEnv env) { return false; } - - @Override - protected void serializeStateData(final OutputStream stream) throws IOException { - } - - @Override - protected void deserializeStateData(final InputStream stream) throws IOException { - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 12f8263..bcacb48 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -541,40 +541,6 @@ public class TestMasterProcedureScheduler { } @Test - public void testSuspendedTableQueue() throws Exception { - final TableName tableName = TableName.valueOf("testSuspendedQueue"); - - queue.addBack(new TestTableProcedure(1, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - queue.addBack(new TestTableProcedure(2, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - - Procedure proc = queue.poll(); - assertEquals(1, proc.getProcId()); - assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); - - // Suspend - // TODO: If we want to keep the zk-lock we need to retain the lock on suspend - ProcedureEvent event = new ProcedureEvent("testSuspendedTableQueueEvent"); - assertEquals(true, queue.waitEvent(event, proc, true)); - queue.releaseTableExclusiveLock(proc, tableName); - assertEquals(null, queue.poll(0)); - - // Resume - queue.wakeEvent(event); - - proc = queue.poll(); - assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); - assertEquals(1, proc.getProcId()); - queue.releaseTableExclusiveLock(proc, tableName); - - proc = queue.poll(); - assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); - assertEquals(2, proc.getProcId()); - queue.releaseTableExclusiveLock(proc, tableName); - } - - @Test public void testSuspendedProcedure() throws Exception { final TableName tableName = TableName.valueOf("testSuspendedProcedure"); http://git-wip-us.apache.org/repos/asf/hbase/blob/92ef2344/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java index 380067d..d6ddd13 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java @@ -23,23 +23,18 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.ConcurrentSkipListSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure; -import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedureWithEvent; import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.apache.hadoop.hbase.util.Threads; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -212,92 +207,6 @@ public class TestMasterProcedureSchedulerConcurrency { } } - @Test(timeout=60000) - public void testConcurrentWaitWake() throws Exception { - testConcurrentWaitWake(false); - } - - @Test(timeout=60000) - public void testConcurrentWaitWakeBatch() throws Exception { - testConcurrentWaitWake(true); - } - - private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception { - final TableName tableName = TableName.valueOf("testtb"); - - final int NPROCS = 20; - final int NRUNS = 100; - - for (long i = 0; i < NPROCS; ++i) { - queue.addBack(new TestTableProcedureWithEvent(i, tableName, - TableProcedureInterface.TableOperationType.READ)); - } - - final Thread[] threads = new Thread[4]; - final AtomicInteger waitCount = new AtomicInteger(0); - final AtomicInteger wakeCount = new AtomicInteger(0); - - final ConcurrentSkipListSet<TestTableProcedureWithEvent> waitQueue = - new ConcurrentSkipListSet<TestTableProcedureWithEvent>(); - threads[0] = new Thread() { - @Override - public void run() { - while (true) { - if (useWakeBatch) { - ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()]; - for (int i = 0; i < ev.length; ++i) { - ev[i] = waitQueue.pollFirst().getEvent(); - LOG.debug("WAKE " + ev[i] + " total=" + wakeCount.get()); - } - queue.wakeEvents(ev, ev.length); - wakeCount.addAndGet(ev.length); - } else { - int size = waitQueue.size(); - while (size-- > 0) { - ProcedureEvent ev = waitQueue.pollFirst().getEvent(); - queue.wakeEvent(ev); - LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); - wakeCount.incrementAndGet(); - } - } - if (wakeCount.get() >= NRUNS) { - break; - } - Threads.sleepWithoutInterrupt(25); - } - } - }; - - for (int i = 1; i < threads.length; ++i) { - threads[i] = new Thread() { - @Override - public void run() { - while (true) { - TestTableProcedureWithEvent proc = (TestTableProcedureWithEvent)queue.poll(); - if (proc == null) continue; - - waitQueue.add(proc); - queue.suspendEvent(proc.getEvent()); - queue.waitEvent(proc.getEvent(), proc); - LOG.debug("WAIT " + proc.getEvent()); - if (waitCount.incrementAndGet() >= NRUNS) { - break; - } - } - } - }; - } - - for (int i = 0; i < threads.length; ++i) { - threads[i].start(); - } - for (int i = 0; i < threads.length; ++i) { - threads[i].join(); - } - - queue.clear(); - } - public static class TestTableProcSet { private final MasterProcedureScheduler queue;