http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
deleted file mode 100644
index bf9afd7..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
+++ /dev/null
@@ -1,785 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.procedure;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SplitTableRegionState;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * The procedure to split a region in a table.
- */
-@InterfaceAudience.Private
-public class SplitTableRegionProcedure
-    extends AbstractStateMachineTableProcedure<SplitTableRegionState> {
-  private static final Log LOG = 
LogFactory.getLog(SplitTableRegionProcedure.class);
-
-  private Boolean traceEnabled;
-
-  /*
-   * Region to split
-   */
-  private HRegionInfo parentHRI;
-  private HRegionInfo daughter_1_HRI;
-  private HRegionInfo daughter_2_HRI;
-
-  public SplitTableRegionProcedure() {
-    this.traceEnabled = null;
-  }
-
-  public SplitTableRegionProcedure(final MasterProcedureEnv env,
-      final HRegionInfo regionToSplit, final byte[] splitRow) throws 
IOException {
-    super(env);
-
-    checkSplitRow(regionToSplit, splitRow);
-
-    this.traceEnabled = null;
-    this.parentHRI = regionToSplit;
-
-    final TableName table = regionToSplit.getTable();
-    final long rid = getDaughterRegionIdTimestamp(regionToSplit);
-    this.daughter_1_HRI = new HRegionInfo(table, regionToSplit.getStartKey(), 
splitRow, false, rid);
-    this.daughter_2_HRI = new HRegionInfo(table, splitRow, 
regionToSplit.getEndKey(), false, rid);
-  }
-
-  private static void checkSplitRow(final HRegionInfo regionToSplit, final 
byte[] splitRow)
-      throws IOException {
-    if (splitRow == null || splitRow.length == 0) {
-      throw new DoNotRetryIOException("Split row cannot be null");
-    }
-
-    if (Bytes.equals(regionToSplit.getStartKey(), splitRow)) {
-      throw new DoNotRetryIOException(
-        "Split row is equal to startkey: " + Bytes.toStringBinary(splitRow));
-    }
-
-    if (!regionToSplit.containsRow(splitRow)) {
-      throw new DoNotRetryIOException(
-        "Split row is not inside region key range splitKey:" + 
Bytes.toStringBinary(splitRow) +
-        " region: " + regionToSplit);
-    }
-  }
-
-  /**
-   * Calculate daughter regionid to use.
-   * @param hri Parent {@link HRegionInfo}
-   * @return Daughter region id (timestamp) to use.
-   */
-  private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
-    long rid = EnvironmentEdgeManager.currentTime();
-    // Regionid is timestamp.  Can't be less than that of parent else will 
insert
-    // at wrong location in hbase:meta (See HBASE-710).
-    if (rid < hri.getRegionId()) {
-      LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
-        " but current time here is " + rid);
-      rid = hri.getRegionId() + 1;
-    }
-    return rid;
-  }
-
-  @Override
-  protected Flow executeFromState(final MasterProcedureEnv env, final 
SplitTableRegionState state)
-      throws InterruptedException {
-    if (isTraceEnabled()) {
-      LOG.trace(this + " execute state=" + state);
-    }
-
-    try {
-      switch (state) {
-      case SPLIT_TABLE_REGION_PREPARE:
-        if (prepareSplitRegion(env)) {
-          setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION);
-          break;
-        } else {
-          assert isFailed() : "split region should have an exception here";
-          return Flow.NO_MORE_STATE;
-        }
-      case SPLIT_TABLE_REGION_PRE_OPERATION:
-        preSplitRegion(env);
-        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE);
-        break;
-      case SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE:
-        setRegionStateToSplitting(env);
-        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSE_PARENT_REGION);
-        break;
-      case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION:
-        closeParentRegionForSplit(env);
-        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS);
-        break;
-      case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
-        createDaughterRegions(env);
-        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR);
-        break;
-      case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR:
-        preSplitRegionBeforePONR(env);
-        setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META);
-        break;
-      case SPLIT_TABLE_REGION_UPDATE_META:
-        // This is the point of no return.  Adding subsequent edits to .META. 
as we
-        // do below when we do the daughter opens adding each to .META. can 
fail in
-        // various interesting ways the most interesting of which is a timeout
-        // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
-        // then subsequent failures need to crash out this region server; the
-        // server shutdown processing should be able to fix-up the incomplete 
split.
-        // The offlined parent will have the daughters as extra columns.  If
-        // we leave the daughter regions in place and do not remove them when 
we
-        // crash out, then they will have their references to the parent in 
place
-        // still and the server shutdown fixup of .META. will point to these
-        // regions.
-        // We should add PONR JournalEntry before offlineParentInMeta,so even 
if
-        // OfflineParentInMeta timeout,this will cause regionserver exit,and 
then
-        // master ServerShutdownHandler will fix daughter & avoid data loss. 
(See
-        // HBase-4562).
-        updateMetaForDaughterRegions(env);
-        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR);
-        break;
-      case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
-        preSplitRegionAfterPONR(env);
-        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS);
-        break;
-      case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
-        openDaughterRegions(env);
-        setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_POST_OPERATION);
-        break;
-      case SPLIT_TABLE_REGION_POST_OPERATION:
-        postSplitRegion(env);
-        return Flow.NO_MORE_STATE;
-      default:
-        throw new UnsupportedOperationException(this + " unhandled state=" + 
state);
-      }
-    } catch (IOException e) {
-      String msg = "Error trying to split region " + 
parentHRI.getEncodedName() + " in the table "
-          + getTableName() + " (in state=" + state + ")";
-      if (!isRollbackSupported(state)) {
-        // We reach a state that cannot be rolled back. We just need to keep 
retry.
-        LOG.warn(msg, e);
-      } else {
-        LOG.error(msg, e);
-        setFailure("master-split-region", e);
-      }
-    }
-    return Flow.HAS_MORE_STATE;
-  }
-
-  @Override
-  protected void rollbackState(final MasterProcedureEnv env, final 
SplitTableRegionState state)
-      throws IOException, InterruptedException {
-    if (isTraceEnabled()) {
-      LOG.trace(this + " rollback state=" + state);
-    }
-
-    try {
-      switch (state) {
-      case SPLIT_TABLE_REGION_POST_OPERATION:
-      case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
-      case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
-      case SPLIT_TABLE_REGION_UPDATE_META:
-        // PONR
-        throw new UnsupportedOperationException(this + " unhandled state=" + 
state);
-      case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR:
-        break;
-      case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
-        // Doing nothing, as re-open parent region would clean up daughter 
region directories.
-        break;
-      case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION:
-        openParentRegion(env);
-        break;
-      case SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE:
-        setRegionStateToRevertSplitting(env);
-        break;
-      case SPLIT_TABLE_REGION_PRE_OPERATION:
-        postRollBackSplitRegion(env);
-        break;
-      case SPLIT_TABLE_REGION_PREPARE:
-        break; // nothing to do
-      default:
-        throw new UnsupportedOperationException(this + " unhandled state=" + 
state);
-      }
-    } catch (IOException e) {
-      // This will be retried. Unless there is a bug in the code,
-      // this should be just a "temporary error" (e.g. network down)
-      LOG.warn("Failed rollback attempt step " + state + " for splitting the 
region "
-        + parentHRI.getEncodedName() + " in table " + getTableName(), e);
-      throw e;
-    }
-  }
-
-  /*
-   * Check whether we are in the state that can be rollback
-   */
-  @Override
-  protected boolean isRollbackSupported(final SplitTableRegionState state) {
-    switch (state) {
-      case SPLIT_TABLE_REGION_POST_OPERATION:
-      case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
-      case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
-      case SPLIT_TABLE_REGION_UPDATE_META:
-        // It is not safe to rollback if we reach to these states.
-        return false;
-      default:
-        break;
-    }
-    return true;
-  }
-
-  @Override
-  protected SplitTableRegionState getState(final int stateId) {
-    return SplitTableRegionState.forNumber(stateId);
-  }
-
-  @Override
-  protected int getStateId(final SplitTableRegionState state) {
-    return state.getNumber();
-  }
-
-  @Override
-  protected SplitTableRegionState getInitialState() {
-    return SplitTableRegionState.SPLIT_TABLE_REGION_PREPARE;
-  }
-
-  @Override
-  public void serializeStateData(final OutputStream stream) throws IOException 
{
-    super.serializeStateData(stream);
-
-    final MasterProcedureProtos.SplitTableRegionStateData.Builder 
splitTableRegionMsg =
-        MasterProcedureProtos.SplitTableRegionStateData.newBuilder()
-        .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
-        .setParentRegionInfo(HRegionInfo.convert(parentHRI))
-        .addChildRegionInfo(HRegionInfo.convert(daughter_1_HRI))
-        .addChildRegionInfo(HRegionInfo.convert(daughter_2_HRI));
-    splitTableRegionMsg.build().writeDelimitedTo(stream);
-  }
-
-  @Override
-  public void deserializeStateData(final InputStream stream) throws 
IOException {
-    super.deserializeStateData(stream);
-
-    final MasterProcedureProtos.SplitTableRegionStateData splitTableRegionsMsg 
=
-        
MasterProcedureProtos.SplitTableRegionStateData.parseDelimitedFrom(stream);
-    
setUser(MasterProcedureUtil.toUserInfo(splitTableRegionsMsg.getUserInfo()));
-    parentHRI = 
HRegionInfo.convert(splitTableRegionsMsg.getParentRegionInfo());
-    if (splitTableRegionsMsg.getChildRegionInfoCount() == 0) {
-      daughter_1_HRI = daughter_2_HRI = null;
-    } else {
-      assert(splitTableRegionsMsg.getChildRegionInfoCount() == 2);
-      daughter_1_HRI = 
HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfoList().get(0));
-      daughter_2_HRI = 
HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfoList().get(1));
-    }
-  }
-
-  @Override
-  public void toStringClassDetails(StringBuilder sb) {
-    sb.append(getClass().getSimpleName());
-    sb.append(" (table=");
-    sb.append(getTableName());
-    sb.append(" parent region=");
-    sb.append(parentHRI);
-    if (daughter_1_HRI != null) {
-      sb.append(" first daughter region=");
-      sb.append(daughter_1_HRI);
-    }
-    if (daughter_2_HRI != null) {
-      sb.append(" and second daughter region=");
-      sb.append(daughter_2_HRI);
-    }
-    sb.append(")");
-  }
-
-  @Override
-  protected LockState acquireLock(final MasterProcedureEnv env) {
-    if (env.waitInitialized(this)) {
-      return LockState.LOCK_EVENT_WAIT;
-    }
-    return env.getProcedureScheduler().waitRegions(this, getTableName(), 
parentHRI)?
-        LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
-  }
-
-  @Override
-  protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureScheduler().wakeRegions(this, getTableName(), parentHRI);
-  }
-
-  @Override
-  public TableName getTableName() {
-    return parentHRI.getTable();
-  }
-
-  @Override
-  public TableOperationType getTableOperationType() {
-    return TableOperationType.SPLIT;
-  }
-
-  private byte[] getSplitRow() {
-    return daughter_2_HRI.getStartKey();
-  }
-
-  /**
-   * Prepare to Split region.
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public boolean prepareSplitRegion(final MasterProcedureEnv env) throws 
IOException {
-    // Check whether the region is splittable
-    final RegionState state = getParentRegionState(env);
-    if (state.isClosing() || state.isClosed() ||
-        state.isSplittingOrSplitOnServer(state.getServerName())) {
-      setFailure(
-        "master-split-region",
-        new IOException("Split region " + parentHRI + " failed due to region 
is not splittable"));
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Action before splitting region in a table.
-   * @param env MasterProcedureEnv
-   * @param state the procedure state
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private void preSplitRegion(final MasterProcedureEnv env)
-      throws IOException, InterruptedException {
-    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      cpHost.preSplitRegionAction(getTableName(), getSplitRow(), getUser());
-    }
-  }
-
-  /**
-   * Action after rollback a split table region action.
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void postRollBackSplitRegion(final MasterProcedureEnv env) throws 
IOException {
-    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      cpHost.postRollBackSplitRegionAction(getUser());
-    }
-  }
-
-  /**
-   * Set the parent region state to SPLITTING state
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public void setRegionStateToSplitting(final MasterProcedureEnv env) throws 
IOException {
-    RegionStateTransition.Builder transition = 
RegionStateTransition.newBuilder();
-    transition.setTransitionCode(TransitionCode.READY_TO_SPLIT);
-    transition.addRegionInfo(HRegionInfo.convert(parentHRI));
-    transition.addRegionInfo(HRegionInfo.convert(daughter_1_HRI));
-    transition.addRegionInfo(HRegionInfo.convert(daughter_2_HRI));
-    if (env.getMasterServices().getAssignmentManager().onRegionTransition(
-      getParentRegionState(env).getServerName(), transition.build()) != null) {
-      throw new IOException("Failed to update region state to SPLITTING for "
-          + parentHRI.getRegionNameAsString());
-    }
-  }
-
-  /**
-   * Rollback the region state change
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void setRegionStateToRevertSplitting(final MasterProcedureEnv env) 
throws IOException {
-    RegionStateTransition.Builder transition = 
RegionStateTransition.newBuilder();
-    transition.setTransitionCode(TransitionCode.SPLIT_REVERTED);
-    transition.addRegionInfo(HRegionInfo.convert(parentHRI));
-    transition.addRegionInfo(HRegionInfo.convert(daughter_1_HRI));
-    transition.addRegionInfo(HRegionInfo.convert(daughter_2_HRI));
-    if (env.getMasterServices().getAssignmentManager().onRegionTransition(
-      getParentRegionState(env).getServerName(), transition.build()) != null) {
-      throw new IOException("Failed to update region state for "
-          + parentHRI.getRegionNameAsString() + " as part of operation for 
reverting split");
-    }
-  }
-
-  /**
-   * RPC to region server that host the parent region, ask for close the 
parent regions
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public void closeParentRegionForSplit(final MasterProcedureEnv env) throws 
IOException {
-    boolean success = 
env.getMasterServices().getServerManager().sendRegionCloseForSplitOrMerge(
-      getParentRegionState(env).getServerName(), parentHRI);
-    if (!success) {
-      throw new IOException("Close parent region " + parentHRI + " for 
splitting failed."
-        + "  Check region server log for more details");
-    }
-  }
-
-  /**
-   * Rollback close parent region
-   * @param env MasterProcedureEnv
-   **/
-  private void openParentRegion(final MasterProcedureEnv env) throws 
IOException {
-    // Check whether the region is closed; if so, open it in the same server
-    RegionState state = getParentRegionState(env);
-    if (state.isClosing() || state.isClosed()) {
-      env.getMasterServices().getServerManager().sendRegionOpen(
-        getParentRegionState(env).getServerName(),
-        parentHRI,
-        ServerName.EMPTY_SERVER_LIST);
-    }
-  }
-
-  /**
-   * Create daughter regions
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public void createDaughterRegions(final MasterProcedureEnv env) throws 
IOException {
-    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
-    final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), 
parentHRI.getTable());
-    final FileSystem fs = mfs.getFileSystem();
-    HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
-      env.getMasterConfiguration(), fs, tabledir, parentHRI, false);
-    regionFs.createSplitsDir();
-
-    Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
-
-    assertReferenceFileCount(
-      fs, expectedReferences.getFirst(), 
regionFs.getSplitsDir(daughter_1_HRI));
-    //Move the files from the temporary .splits to the final /table/region 
directory
-    regionFs.commitDaughterRegion(daughter_1_HRI);
-    assertReferenceFileCount(
-      fs,
-      expectedReferences.getFirst(),
-      new Path(tabledir, daughter_1_HRI.getEncodedName()));
-
-    assertReferenceFileCount(
-      fs, expectedReferences.getSecond(), 
regionFs.getSplitsDir(daughter_2_HRI));
-    regionFs.commitDaughterRegion(daughter_2_HRI);
-    assertReferenceFileCount(
-      fs,
-      expectedReferences.getSecond(),
-      new Path(tabledir, daughter_2_HRI.getEncodedName()));
-  }
-
-  /**
-   * Create Split directory
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
-      final HRegionFileSystem regionFs) throws IOException {
-    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
-    final Configuration conf = env.getMasterConfiguration();
-
-    // The following code sets up a thread pool executor with as many slots as
-    // there's files to split. It then fires up everything, waits for
-    // completion and finally checks for any exception
-    //
-    // Note: splitStoreFiles creates daughter region dirs under the parent 
splits dir
-    // Nothing to unroll here if failure -- re-run createSplitsDir will
-    // clean this up.
-    int nbFiles = 0;
-    for (String family: regionFs.getFamilies()) {
-      Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
-      if (storeFiles != null) {
-        nbFiles += storeFiles.size();
-      }
-    }
-    if (nbFiles == 0) {
-      // no file needs to be splitted.
-      return new Pair<>(0,0);
-    }
-    // Default max #threads to use is the smaller of table's configured number 
of blocking store
-    // files or the available number of logical cores.
-    int defMaxThreads = Math.min(
-      conf.getInt(HStore.BLOCKING_STOREFILES_KEY, 
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
-      Runtime.getRuntime().availableProcessors());
-    // Max #threads is the smaller of the number of storefiles or the default 
max determined above.
-    int maxThreads = Math.min(
-      conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, defMaxThreads), 
nbFiles);
-    LOG.info("Preparing to split " + nbFiles + " storefiles for region " + 
parentHRI +
-            " using " + maxThreads + " threads");
-    ThreadPoolExecutor threadPool = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(
-      maxThreads, Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
-    List<Future<Pair<Path,Path>>> futures = new ArrayList<>(nbFiles);
-
-    // Split each store file.
-    final HTableDescriptor htd = 
env.getMasterServices().getTableDescriptors().get(getTableName());
-    for (String family: regionFs.getFamilies()) {
-      final HColumnDescriptor hcd = htd.getFamily(family.getBytes());
-      final Collection<StoreFileInfo> storeFiles = 
regionFs.getStoreFiles(family);
-      if (storeFiles != null && storeFiles.size() > 0) {
-        final CacheConfig cacheConf = new CacheConfig(conf, hcd);
-        for (StoreFileInfo storeFileInfo: storeFiles) {
-          StoreFileSplitter sfs =
-              new StoreFileSplitter(regionFs, family.getBytes(), new 
StoreFile(mfs.getFileSystem(),
-                  storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), 
true));
-          futures.add(threadPool.submit(sfs));
-        }
-      }
-    }
-    // Shutdown the pool
-    threadPool.shutdown();
-
-    // Wait for all the tasks to finish
-    long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout", 
30000);
-    try {
-      boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, 
TimeUnit.MILLISECONDS);
-      if (stillRunning) {
-        threadPool.shutdownNow();
-        // wait for the thread to shutdown completely.
-        while (!threadPool.isTerminated()) {
-          Thread.sleep(50);
-        }
-        throw new IOException("Took too long to split the" +
-            " files and create the references, aborting split");
-      }
-    } catch (InterruptedException e) {
-      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
-    }
-
-    int daughterA = 0;
-    int daughterB = 0;
-    // Look for any exception
-    for (Future<Pair<Path, Path>> future : futures) {
-      try {
-        Pair<Path, Path> p = future.get();
-        daughterA += p.getFirst() != null ? 1 : 0;
-        daughterB += p.getSecond() != null ? 1 : 0;
-      } catch (InterruptedException e) {
-        throw (InterruptedIOException) new 
InterruptedIOException().initCause(e);
-      } catch (ExecutionException e) {
-        throw new IOException(e);
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Split storefiles for region " + parentHRI + " Daughter A: " + 
daughterA
-          + " storefiles, Daughter B: " + daughterB + " storefiles.");
-    }
-    return new Pair<>(daughterA, daughterB);
-  }
-
-  private void assertReferenceFileCount(
-      final FileSystem fs,
-      final int expectedReferenceFileCount,
-      final Path dir)
-      throws IOException {
-    if (expectedReferenceFileCount != 0 &&
-        expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(fs, 
dir)) {
-      throw new IOException("Failing split. Expected reference file count 
isn't equal.");
-    }
-  }
-
-  private Pair<Path, Path> splitStoreFile(final HRegionFileSystem regionFs,
-      final byte[] family, final StoreFile sf) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Splitting started for store file: " + sf.getPath() + " for 
region: " + parentHRI);
-    }
-
-    final byte[] splitRow = getSplitRow();
-    final String familyName = Bytes.toString(family);
-    final Path path_first =
-        regionFs.splitStoreFile(this.daughter_1_HRI, familyName, sf, splitRow, 
false, null);
-    final Path path_second =
-        regionFs.splitStoreFile(this.daughter_2_HRI, familyName, sf, splitRow, 
true, null);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Splitting complete for store file: " + sf.getPath() + " for 
region: " + parentHRI);
-    }
-    return new Pair<>(path_first, path_second);
-  }
-
-  /**
-   * Utility class used to do the file splitting / reference writing
-   * in parallel instead of sequentially.
-   */
-  private class StoreFileSplitter implements Callable<Pair<Path,Path>> {
-    private final HRegionFileSystem regionFs;
-    private final byte[] family;
-    private final StoreFile sf;
-
-    /**
-     * Constructor that takes what it needs to split
-     * @param regionFs the file system
-     * @param family Family that contains the store file
-     * @param sf which file
-     */
-    public StoreFileSplitter(
-        final HRegionFileSystem regionFs,
-        final byte[] family,
-        final StoreFile sf) {
-      this.regionFs = regionFs;
-      this.sf = sf;
-      this.family = family;
-    }
-
-    public Pair<Path,Path> call() throws IOException {
-      return splitStoreFile(regionFs, family, sf);
-    }
-  }
-
-  /**
-   * Post split region actions before the Point-of-No-Return step
-   * @param env MasterProcedureEnv
-   **/
-  private void preSplitRegionBeforePONR(final MasterProcedureEnv env)
-    throws IOException, InterruptedException {
-    final List<Mutation> metaEntries = new ArrayList<>();
-    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      if (cpHost.preSplitBeforePONRAction(getSplitRow(), metaEntries, 
getUser())) {
-        throw new IOException("Coprocessor bypassing region " +
-            parentHRI.getRegionNameAsString() + " split.");
-      }
-      try {
-        for (Mutation p : metaEntries) {
-          HRegionInfo.parseRegionName(p.getRow());
-        }
-      } catch (IOException e) {
-        LOG.error("Row key of mutation from coprocessor is not parsable as 
region name."
-            + "Mutations from coprocessor should only for hbase:meta table.");
-        throw e;
-      }
-    }
-  }
-
-  /**
-   * Add daughter regions to META
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   */
-  private void updateMetaForDaughterRegions(final MasterProcedureEnv env) 
throws IOException {
-    RegionStateTransition.Builder transition = 
RegionStateTransition.newBuilder();
-    transition.setTransitionCode(TransitionCode.SPLIT_PONR);
-    transition.addRegionInfo(HRegionInfo.convert(parentHRI));
-    transition.addRegionInfo(HRegionInfo.convert(daughter_1_HRI));
-    transition.addRegionInfo(HRegionInfo.convert(daughter_2_HRI));
-    if (env.getMasterServices().getAssignmentManager().onRegionTransition(
-      getParentRegionState(env).getServerName(), transition.build()) != null) {
-      throw new IOException("Failed to update meta to add daughter regions in 
split region "
-          + parentHRI.getRegionNameAsString());
-    }
-  }
-
-  /**
-   * Pre split region actions after the Point-of-No-Return step
-   * @param env MasterProcedureEnv
-   **/
-  private void preSplitRegionAfterPONR(final MasterProcedureEnv env)
-      throws IOException, InterruptedException {
-    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      cpHost.preSplitAfterPONRAction(getUser());
-    }
-  }
-
-  /**
-   * Assign daughter regions
-   * @param env MasterProcedureEnv
-   * @throws IOException
-   * @throws InterruptedException
-   **/
-  private void openDaughterRegions(final MasterProcedureEnv env)
-      throws IOException, InterruptedException {
-    env.getMasterServices().getAssignmentManager().assignDaughterRegions(
-      parentHRI, daughter_1_HRI, daughter_2_HRI);
-  }
-
-  /**
-   * Post split region actions
-   * @param env MasterProcedureEnv
-   **/
-  private void postSplitRegion(final MasterProcedureEnv env) throws 
IOException {
-    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      cpHost.postCompletedSplitRegionAction(daughter_1_HRI, daughter_2_HRI, 
getUser());
-    }
-  }
-
-  /**
-   * Get parent region state
-   * @param env MasterProcedureEnv
-   * @return parent region state
-   */
-  private RegionState getParentRegionState(final MasterProcedureEnv env) {
-    RegionStates regionStates = 
env.getMasterServices().getAssignmentManager().getRegionStates();
-    RegionState state = regionStates.getRegionState(parentHRI);
-    if (state == null) {
-      LOG.warn("Split but not in region states: " + parentHRI);
-      state = regionStates.createRegionState(parentHRI);
-    }
-    return state;
-  }
-
-  /**
-   * The procedure could be restarted from a different machine. If the 
variable is null, we need to
-   * retrieve it.
-   * @return traceEnabled
-   */
-  private boolean isTraceEnabled() {
-    if (traceEnabled == null) {
-      traceEnabled = LOG.isTraceEnabled();
-    }
-    return traceEnabled;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
index f74df79..86143ac 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
@@ -31,7 +31,8 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 public interface TableProcedureInterface {
   public enum TableOperationType {
     CREATE, DELETE, DISABLE, EDIT, ENABLE, READ,
-    REGION_EDIT, SPLIT, MERGE, ASSIGN, UNASSIGN, /* region operations */
+    REGION_EDIT, REGION_SPLIT, REGION_MERGE, REGION_ASSIGN, REGION_UNASSIGN,
+      REGION_GC, MERGED_REGIONS_GC/* region operations */
   };
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
index e41b2cd..e7f5ead 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
@@ -85,7 +85,7 @@ public class TruncateTableProcedure
 
           // TODO: Move out... in the acquireLock()
           LOG.debug("waiting for '" + getTableName() + "' regions in 
transition");
-          regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+          regions = 
env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName());
           assert regions != null && !regions.isEmpty() : "unexpected 0 
regions";
           ProcedureSyncWait.waitRegionInTransition(env, regions);
 
@@ -121,12 +121,14 @@ public class TruncateTableProcedure
           setNextState(TruncateTableState.TRUNCATE_TABLE_ASSIGN_REGIONS);
           break;
         case TRUNCATE_TABLE_ASSIGN_REGIONS:
-          CreateTableProcedure.assignRegions(env, getTableName(), regions);
+          CreateTableProcedure.setEnablingState(env, getTableName());
+          
addChildProcedure(env.getAssignmentManager().createAssignProcedures(regions));
           setNextState(TruncateTableState.TRUNCATE_TABLE_POST_OPERATION);
           hTableDescriptor = null;
           regions = null;
           break;
         case TRUNCATE_TABLE_POST_OPERATION:
+          CreateTableProcedure.setEnabledState(env, getTableName());
           postTruncate(env);
           LOG.debug("truncate '" + getTableName() + "' completed");
           return Flow.NO_MORE_STATE;

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
index 25328b1..1ff05eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
@@ -118,8 +118,10 @@ public class MobFileCache {
       this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), 
period, period,
           TimeUnit.SECONDS);
 
-      LOG.info("MobFileCache enabled with cacheSize=" + mobFileMaxCacheSize +
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("MobFileCache enabled with cacheSize=" + mobFileMaxCacheSize 
+
           ", evictPeriods=" +  period + "sec, evictRemainRatio=" + 
evictRemainRatio);
+      }
     } else {
       LOG.info("MobFileCache disabled");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java
index a30bfef..232309b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java
@@ -126,14 +126,14 @@ public class NamespaceAuditor {
     }
   }
 
-  public void updateQuotaForRegionMerge(HRegionInfo hri) throws IOException {
+  public void updateQuotaForRegionMerge(HRegionInfo mergedRegion) throws 
IOException {
     if (!stateManager.isInitialized()) {
       throw new IOException(
           "Merge operation is being performed even before namespace auditor is 
initialized.");
-    } else if (!stateManager
-        .checkAndUpdateNamespaceRegionCount(hri.getTable(), 
hri.getRegionName(), -1)) {
-      throw new QuotaExceededException("Region split not possible for :" + 
hri.getEncodedName()
-          + " as quota limits are exceeded ");
+    } else if 
(!stateManager.checkAndUpdateNamespaceRegionCount(mergedRegion.getTable(),
+        mergedRegion.getRegionName(), -1)) {
+      throw new QuotaExceededException("Region merge not possible for :" +
+        mergedRegion.getEncodedName() + " as quota limits are exceeded ");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java
index 604f211..8f6a21d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java
@@ -88,8 +88,9 @@ class NamespaceStateManager {
     if (nspdesc != null) {
       NamespaceTableAndRegionInfo currentStatus;
       currentStatus = getState(namespace);
-      if (incr > 0 &&
-          currentStatus.getRegionCount() >= 
TableNamespaceManager.getMaxRegions(nspdesc)) {
+      int regionCount = currentStatus.getRegionCount();
+      long maxRegionCount = TableNamespaceManager.getMaxRegions(nspdesc);
+      if (incr > 0 && regionCount >= maxRegionCount) {
         LOG.warn("The region " + Bytes.toStringBinary(regionName)
             + " cannot be created. The region count  will exceed quota on the 
namespace. "
             + "This may be transient, please retry later if there are any 
ongoing split"

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
index 9d24e6c..8f6a33a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
@@ -330,7 +330,7 @@ public class MasterQuotaManager implements 
RegionStateListener {
       namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
     }
   }
-  
+
   public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) 
throws IOException {
     if (initialized) {
       namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
@@ -347,12 +347,14 @@ public class MasterQuotaManager implements 
RegionStateListener {
     return -1;
   }
 
-  public void onRegionMerged(HRegionInfo hri) throws IOException {
+  @Override
+  public void onRegionMerged(HRegionInfo mergedRegion) throws IOException {
     if (initialized) {
-      namespaceQuotaManager.updateQuotaForRegionMerge(hri);
+      namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion);
     }
   }
 
+  @Override
   public void onRegionSplit(HRegionInfo hri) throws IOException {
     if (initialized) {
       namespaceQuotaManager.checkQuotaToSplitRegion(hri);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
new file mode 100644
index 0000000..e7157d0
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -0,0 +1,723 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.conf.ConfigurationManager;
+import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import 
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Compact region on request and then run split if appropriate
+ */
+@InterfaceAudience.Private
+public class CompactSplit implements CompactionRequestor, 
PropagatingConfigurationObserver {
+  private static final Log LOG = LogFactory.getLog(CompactSplit.class);
+
+  // Configuration key for the large compaction threads.
+  public final static String LARGE_COMPACTION_THREADS =
+      "hbase.regionserver.thread.compaction.large";
+  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
+
+  // Configuration key for the small compaction threads.
+  public final static String SMALL_COMPACTION_THREADS =
+      "hbase.regionserver.thread.compaction.small";
+  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
+
+  // Configuration key for split threads
+  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
+  public final static int SPLIT_THREADS_DEFAULT = 1;
+
+  // Configuration keys for merge threads
+  public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
+  public final static int MERGE_THREADS_DEFAULT = 1;
+
+  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
+      "hbase.regionserver.regionSplitLimit";
+  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
+
+  private final HRegionServer server;
+  private final Configuration conf;
+
+  private final ThreadPoolExecutor longCompactions;
+  private final ThreadPoolExecutor shortCompactions;
+  private final ThreadPoolExecutor splits;
+  private final ThreadPoolExecutor mergePool;
+
+  private volatile ThroughputController compactionThroughputController;
+
+  /**
+   * Splitting should not take place if the total number of regions exceed 
this.
+   * This is not a hard limit to the number of regions but it is a guideline to
+   * stop splitting after number of online regions is greater than this.
+   */
+  private int regionSplitLimit;
+
+  /** @param server */
+  CompactSplit(HRegionServer server) {
+    super();
+    this.server = server;
+    this.conf = server.getConfiguration();
+    this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
+        DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
+
+    int largeThreads = Math.max(1, conf.getInt(
+        LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
+    int smallThreads = conf.getInt(
+        SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
+
+    int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
+
+    // if we have throttle threads, make sure the user also specified size
+    Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
+
+    final String n = Thread.currentThread().getName();
+
+    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
+    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
+        60, TimeUnit.SECONDS, stealJobQueue,
+        new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            String name = n + "-longCompactions-" + System.currentTimeMillis();
+            return new Thread(r, name);
+          }
+      });
+    this.longCompactions.setRejectedExecutionHandler(new Rejection());
+    this.longCompactions.prestartAllCoreThreads();
+    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
+        60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
+        new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            String name = n + "-shortCompactions-" + 
System.currentTimeMillis();
+            return new Thread(r, name);
+          }
+      });
+    this.shortCompactions
+        .setRejectedExecutionHandler(new Rejection());
+    this.splits = (ThreadPoolExecutor)
+        Executors.newFixedThreadPool(splitThreads,
+            new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            String name = n + "-splits-" + System.currentTimeMillis();
+            return new Thread(r, name);
+          }
+      });
+    int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
+    this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
+        mergeThreads, new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            String name = n + "-merges-" + System.currentTimeMillis();
+            return new Thread(r, name);
+          }
+        });
+
+    // compaction throughput controller
+    this.compactionThroughputController =
+        CompactionThroughputControllerFactory.create(server, conf);
+  }
+
+  @Override
+  public String toString() {
+    return "compaction_queue=("
+        + longCompactions.getQueue().size() + ":"
+        + shortCompactions.getQueue().size() + ")"
+        + ", split_queue=" + splits.getQueue().size();
+  }
+
+  public String dumpQueue() {
+    StringBuffer queueLists = new StringBuffer();
+    queueLists.append("Compaction/Split Queue dump:\n");
+    queueLists.append("  LargeCompation Queue:\n");
+    BlockingQueue<Runnable> lq = longCompactions.getQueue();
+    Iterator<Runnable> it = lq.iterator();
+    while (it.hasNext()) {
+      queueLists.append("    " + it.next().toString());
+      queueLists.append("\n");
+    }
+
+    if (shortCompactions != null) {
+      queueLists.append("\n");
+      queueLists.append("  SmallCompation Queue:\n");
+      lq = shortCompactions.getQueue();
+      it = lq.iterator();
+      while (it.hasNext()) {
+        queueLists.append("    " + it.next().toString());
+        queueLists.append("\n");
+      }
+    }
+
+    queueLists.append("\n");
+    queueLists.append("  Split Queue:\n");
+    lq = splits.getQueue();
+    it = lq.iterator();
+    while (it.hasNext()) {
+      queueLists.append("    " + it.next().toString());
+      queueLists.append("\n");
+    }
+
+    return queueLists.toString();
+  }
+
+  public synchronized void requestRegionsMerge(final Region a,
+      final Region b, final boolean forcible, long masterSystemTime, User 
user) {
+    try {
+      mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, 
masterSystemTime,user));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
+            + forcible + ".  " + this);
+      }
+    } catch (RejectedExecutionException ree) {
+      LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
+          + forcible, ree);
+    }
+  }
+
+  public synchronized boolean requestSplit(final Region r) {
+    // don't split regions that are blocking
+    if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= 
Store.PRIORITY_USER) {
+      byte[] midKey = ((HRegion)r).checkSplit();
+      if (midKey != null) {
+        requestSplit(r, midKey);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public synchronized void requestSplit(final Region r, byte[] midKey) {
+    requestSplit(r, midKey, null);
+  }
+
+  /*
+   * The User parameter allows the split thread to assume the correct user 
identity
+   */
+  public synchronized void requestSplit(final Region r, byte[] midKey, User 
user) {
+    if (midKey == null) {
+      LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
+        " not splittable because midkey=null");
+      if (((HRegion)r).shouldForceSplit()) {
+        ((HRegion)r).clearSplit();
+      }
+      return;
+    }
+    try {
+      this.splits.execute(new SplitRequest(r, midKey, this.server, user));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Splitting " + r + ", " + this);
+      }
+    } catch (RejectedExecutionException ree) {
+      LOG.info("Could not execute split for " + r, ree);
+    }
+  }
+
+  @Override
+  public synchronized List<CompactionRequest> requestCompaction(final Region 
r, final String why)
+      throws IOException {
+    return requestCompaction(r, why, null);
+  }
+
+  @Override
+  public synchronized List<CompactionRequest> requestCompaction(final Region 
r, final String why,
+      List<Pair<CompactionRequest, Store>> requests) throws IOException {
+    return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
+  }
+
+  @Override
+  public synchronized CompactionRequest requestCompaction(final Region r, 
final Store s,
+      final String why, CompactionRequest request) throws IOException {
+    return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
+  }
+
+  @Override
+  public synchronized List<CompactionRequest> requestCompaction(final Region 
r, final String why,
+      int p, List<Pair<CompactionRequest, Store>> requests, User user) throws 
IOException {
+    return requestCompactionInternal(r, why, p, requests, true, user);
+  }
+
+  private List<CompactionRequest> requestCompactionInternal(final Region r, 
final String why,
+      int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, 
User user)
+          throws IOException {
+    // not a special compaction request, so make our own list
+    List<CompactionRequest> ret = null;
+    if (requests == null) {
+      ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) 
: null;
+      for (Store s : r.getStores()) {
+        CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, 
selectNow, user);
+        if (selectNow) ret.add(cr);
+      }
+    } else {
+      Preconditions.checkArgument(selectNow); // only system requests have 
selectNow == false
+      ret = new ArrayList<CompactionRequest>(requests.size());
+      for (Pair<CompactionRequest, Store> pair : requests) {
+        ret.add(requestCompaction(r, pair.getSecond(), why, p, 
pair.getFirst(), user));
+      }
+    }
+    return ret;
+  }
+
+  public CompactionRequest requestCompaction(final Region r, final Store s,
+      final String why, int priority, CompactionRequest request, User user) 
throws IOException {
+    return requestCompactionInternal(r, s, why, priority, request, true, user);
+  }
+
+  public synchronized void requestSystemCompaction(
+      final Region r, final String why) throws IOException {
+    requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
+  }
+
+  public void requestSystemCompaction(
+      final Region r, final Store s, final String why) throws IOException {
+    requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
+  }
+
+  /**
+   * @param r region store belongs to
+   * @param s Store to request compaction on
+   * @param why Why compaction requested -- used in debug messages
+   * @param priority override the default priority (NO_PRIORITY == decide)
+   * @param request custom compaction request. Can be <tt>null</tt> in which 
case a simple
+   *          compaction will be used.
+   */
+  private synchronized CompactionRequest requestCompactionInternal(final 
Region r, final Store s,
+      final String why, int priority, CompactionRequest request, boolean 
selectNow, User user)
+          throws IOException {
+    if (this.server.isStopped()
+        || (r.getTableDesc() != null && 
!r.getTableDesc().isCompactionEnabled())) {
+      return null;
+    }
+
+    CompactionContext compaction = null;
+    if (selectNow) {
+      compaction = selectCompaction(r, s, priority, request, user);
+      if (compaction == null) return null; // message logged inside
+    }
+
+    final RegionServerSpaceQuotaManager spaceQuotaManager =
+      this.server.getRegionServerSpaceQuotaManager();
+    if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled(
+        r.getTableDesc().getTableName())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ignoring compaction request for " + r + " as an active 
space quota violation "
+            + " policy disallows compactions.");
+      }
+      return null;
+    }
+
+    // We assume that most compactions are small. So, put system compactions 
into small
+    // pool; we will do selection there, and move to large pool if necessary.
+    ThreadPoolExecutor pool = (selectNow && 
s.throttleCompaction(compaction.getRequest().getSize()))
+      ? longCompactions : shortCompactions;
+    pool.execute(new CompactionRunner(s, r, compaction, pool, user));
+    if (LOG.isDebugEnabled()) {
+      String type = (pool == shortCompactions) ? "Small " : "Large ";
+      LOG.debug(type + "Compaction requested: " + (selectNow ? 
compaction.toString() : "system")
+          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " 
+ this);
+    }
+    return selectNow ? compaction.getRequest() : null;
+  }
+
+  private CompactionContext selectCompaction(final Region r, final Store s,
+      int priority, CompactionRequest request, User user) throws IOException {
+    CompactionContext compaction = s.requestCompaction(priority, request, 
user);
+    if (compaction == null) {
+      if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
+        LOG.debug("Not compacting " + 
r.getRegionInfo().getRegionNameAsString() +
+            " because compaction request was cancelled");
+      }
+      return null;
+    }
+    assert compaction.hasSelection();
+    if (priority != Store.NO_PRIORITY) {
+      compaction.getRequest().setPriority(priority);
+    }
+    return compaction;
+  }
+
+  /**
+   * Only interrupt once it's done with a run through the work loop.
+   */
+  void interruptIfNecessary() {
+    splits.shutdown();
+    longCompactions.shutdown();
+    shortCompactions.shutdown();
+  }
+
+  private void waitFor(ThreadPoolExecutor t, String name) {
+    boolean done = false;
+    while (!done) {
+      try {
+        done = t.awaitTermination(60, TimeUnit.SECONDS);
+        LOG.info("Waiting for " + name + " to finish...");
+        if (!done) {
+          t.shutdownNow();
+        }
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted waiting for " + name + " to finish...");
+      }
+    }
+  }
+
+  void join() {
+    waitFor(splits, "Split Thread");
+    waitFor(longCompactions, "Large Compaction Thread");
+    waitFor(shortCompactions, "Small Compaction Thread");
+  }
+
+  /**
+   * Returns the current size of the queue containing regions that are
+   * processed.
+   *
+   * @return The current size of the regions queue.
+   */
+  public int getCompactionQueueSize() {
+    return longCompactions.getQueue().size() + 
shortCompactions.getQueue().size();
+  }
+
+  public int getLargeCompactionQueueSize() {
+    return longCompactions.getQueue().size();
+  }
+
+
+  public int getSmallCompactionQueueSize() {
+    return shortCompactions.getQueue().size();
+  }
+
+  public int getSplitQueueSize() {
+    return splits.getQueue().size();
+  }
+
+  private boolean shouldSplitRegion() {
+    if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
+      LOG.warn("Total number of regions is approaching the upper limit " + 
regionSplitLimit + ". "
+          + "Please consider taking a look at 
http://hbase.apache.org/book.html#ops.regionmgt";);
+    }
+    return (regionSplitLimit > server.getNumberOfOnlineRegions());
+  }
+
+  /**
+   * @return the regionSplitLimit
+   */
+  public int getRegionSplitLimit() {
+    return this.regionSplitLimit;
+  }
+
+  
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification="Contrived use of compareTo")
+  private class CompactionRunner implements Runnable, 
Comparable<CompactionRunner> {
+    private final Store store;
+    private final HRegion region;
+    private CompactionContext compaction;
+    private int queuedPriority;
+    private ThreadPoolExecutor parent;
+    private User user;
+    private long time;
+
+    public CompactionRunner(Store store, Region region,
+        CompactionContext compaction, ThreadPoolExecutor parent, User user) {
+      super();
+      this.store = store;
+      this.region = (HRegion)region;
+      this.compaction = compaction;
+      this.queuedPriority = (this.compaction == null)
+          ? store.getCompactPriority() : compaction.getRequest().getPriority();
+      this.parent = parent;
+      this.user = user;
+      this.time =  System.currentTimeMillis();
+    }
+
+    @Override
+    public String toString() {
+      return (this.compaction != null) ? ("Request = " + 
compaction.getRequest())
+          : ("regionName = " + region.toString() + ", storeName = " + 
store.toString() +
+             ", priority = " + queuedPriority + ", time = " + time);
+    }
+
+    private void doCompaction(User user) {
+      // Common case - system compaction without a file selection. Select now.
+      if (this.compaction == null) {
+        int oldPriority = this.queuedPriority;
+        this.queuedPriority = this.store.getCompactPriority();
+        if (this.queuedPriority > oldPriority) {
+          // Store priority decreased while we were in queue (due to some 
other compaction?),
+          // requeue with new priority to avoid blocking potential higher 
priorities.
+          this.parent.execute(this);
+          return;
+        }
+        try {
+          this.compaction = selectCompaction(this.region, this.store, 
queuedPriority, null, user);
+        } catch (IOException ex) {
+          LOG.error("Compaction selection failed " + this, ex);
+          server.checkFileSystem();
+          return;
+        }
+        if (this.compaction == null) return; // nothing to do
+        // Now see if we are in correct pool for the size; if not, go to the 
correct one.
+        // We might end up waiting for a while, so cancel the selection.
+        assert this.compaction.hasSelection();
+        ThreadPoolExecutor pool = store.throttleCompaction(
+            compaction.getRequest().getSize()) ? longCompactions : 
shortCompactions;
+
+        // Long compaction pool can process small job
+        // Short compaction pool should not process large job
+        if (this.parent == shortCompactions && pool == longCompactions) {
+          this.store.cancelRequestedCompaction(this.compaction);
+          this.compaction = null;
+          this.parent = pool;
+          this.parent.execute(this);
+          return;
+        }
+      }
+      // Finally we can compact something.
+      assert this.compaction != null;
+
+      this.compaction.getRequest().beforeExecute();
+      try {
+        // Note: please don't put single-compaction logic here;
+        //       put it into region/store/etc. This is CST logic.
+        long start = EnvironmentEdgeManager.currentTime();
+        boolean completed =
+            region.compact(compaction, store, compactionThroughputController, 
user);
+        long now = EnvironmentEdgeManager.currentTime();
+        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
+              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
+        if (completed) {
+          // degenerate case: blocked regions require recursive enqueues
+          if (store.getCompactPriority() <= 0) {
+            requestSystemCompaction(region, store, "Recursive enqueue");
+          } else {
+            // see if the compaction has caused us to exceed max region size
+            requestSplit(region);
+          }
+        }
+      } catch (IOException ex) {
+        IOException remoteEx =
+            ex instanceof RemoteException ? ((RemoteException) 
ex).unwrapRemoteException() : ex;
+        LOG.error("Compaction failed " + this, remoteEx);
+        if (remoteEx != ex) {
+          LOG.info("Compaction failed at original callstack: " + 
formatStackTrace(ex));
+        }
+        region.reportCompactionRequestFailure();
+        server.checkFileSystem();
+      } catch (Exception ex) {
+        LOG.error("Compaction failed " + this, ex);
+        region.reportCompactionRequestFailure();
+        server.checkFileSystem();
+      } finally {
+        LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
+      }
+      this.compaction.getRequest().afterExecute();
+    }
+
+    @Override
+    public void run() {
+      Preconditions.checkNotNull(server);
+      if (server.isStopped()
+          || (region.getTableDesc() != null && 
!region.getTableDesc().isCompactionEnabled())) {
+        return;
+      }
+      doCompaction(user);
+    }
+
+    private String formatStackTrace(Exception ex) {
+      StringWriter sw = new StringWriter();
+      PrintWriter pw = new PrintWriter(sw);
+      ex.printStackTrace(pw);
+      pw.flush();
+      return sw.toString();
+    }
+
+    @Override
+    public int compareTo(CompactionRunner o) {
+      // Only compare the underlying request (if any), for queue sorting 
purposes.
+      int compareVal = queuedPriority - o.queuedPriority; // compare priority
+      if (compareVal != 0) return compareVal;
+      CompactionContext tc = this.compaction, oc = o.compaction;
+      // Sort pre-selected (user?) compactions before system ones with equal 
priority.
+      return (tc == null) ? ((oc == null) ? 0 : 1)
+          : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
+    }
+  }
+
+  /**
+   * Cleanup class to use when rejecting a compaction request from the queue.
+   */
+  private static class Rejection implements RejectedExecutionHandler {
+    @Override
+    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+      if (runnable instanceof CompactionRunner) {
+        CompactionRunner runner = (CompactionRunner)runnable;
+        LOG.debug("Compaction Rejected: " + runner);
+        runner.store.cancelRequestedCompaction(runner.compaction);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void onConfigurationChange(Configuration newConf) {
+    // Check if number of large / small compaction threads has changed, and 
then
+    // adjust the core pool size of the thread pools, by using the
+    // setCorePoolSize() method. According to the javadocs, it is safe to
+    // change the core pool size on-the-fly. We need to reset the maximum
+    // pool size, as well.
+    int largeThreads = Math.max(1, newConf.getInt(
+            LARGE_COMPACTION_THREADS,
+            LARGE_COMPACTION_THREADS_DEFAULT));
+    if (this.longCompactions.getCorePoolSize() != largeThreads) {
+      LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
+              " from " + this.longCompactions.getCorePoolSize() + " to " +
+              largeThreads);
+      if(this.longCompactions.getCorePoolSize() < largeThreads) {
+        this.longCompactions.setMaximumPoolSize(largeThreads);
+        this.longCompactions.setCorePoolSize(largeThreads);
+      } else {
+        this.longCompactions.setCorePoolSize(largeThreads);
+        this.longCompactions.setMaximumPoolSize(largeThreads);
+      }
+    }
+
+    int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
+            SMALL_COMPACTION_THREADS_DEFAULT);
+    if (this.shortCompactions.getCorePoolSize() != smallThreads) {
+      LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
+                " from " + this.shortCompactions.getCorePoolSize() + " to " +
+                smallThreads);
+      if(this.shortCompactions.getCorePoolSize() < smallThreads) {
+        this.shortCompactions.setMaximumPoolSize(smallThreads);
+        this.shortCompactions.setCorePoolSize(smallThreads);
+      } else {
+        this.shortCompactions.setCorePoolSize(smallThreads);
+        this.shortCompactions.setMaximumPoolSize(smallThreads);
+      }
+    }
+
+    int splitThreads = newConf.getInt(SPLIT_THREADS,
+            SPLIT_THREADS_DEFAULT);
+    if (this.splits.getCorePoolSize() != splitThreads) {
+      LOG.info("Changing the value of " + SPLIT_THREADS +
+                " from " + this.splits.getCorePoolSize() + " to " +
+                splitThreads);
+      if(this.splits.getCorePoolSize() < splitThreads) {
+        this.splits.setMaximumPoolSize(splitThreads);
+        this.splits.setCorePoolSize(splitThreads);
+      } else {
+        this.splits.setCorePoolSize(splitThreads);
+        this.splits.setMaximumPoolSize(splitThreads);
+      }
+    }
+
+    ThroughputController old = this.compactionThroughputController;
+    if (old != null) {
+      old.stop("configuration change");
+    }
+    this.compactionThroughputController =
+        CompactionThroughputControllerFactory.create(server, newConf);
+
+    // We change this atomically here instead of reloading the config in order 
that upstream
+    // would be the only one with the flexibility to reload the config.
+    this.conf.reloadConfiguration();
+  }
+
+  protected int getSmallCompactionThreadNum() {
+    return this.shortCompactions.getCorePoolSize();
+  }
+
+  protected int getLargeCompactionThreadNum() {
+    return this.longCompactions.getCorePoolSize();
+  }
+
+  protected int getSplitThreadNum() {
+    return this.splits.getCorePoolSize();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void registerChildren(ConfigurationManager manager) {
+    // No children to register.
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deregisterChildren(ConfigurationManager manager) {
+    // No children to register
+  }
+
+  @VisibleForTesting
+  public ThroughputController getCompactionThroughputController() {
+    return compactionThroughputController;
+  }
+
+  @VisibleForTesting
+  /**
+   * Shutdown the long compaction thread pool.
+   * Should only be used in unit test to prevent long compaction thread pool 
from stealing job
+   * from short compaction queue
+   */
+  void shutdownLongCompactions(){
+    this.longCompactions.shutdown();
+  }
+
+  public void clearLongCompactionsQueue() {
+    longCompactions.getQueue().clear();
+  }
+
+  public void clearShortCompactionsQueue() {
+    shortCompactions.getQueue().clear();
+  }
+}

Reply via email to