http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
new file mode 100644
index 0000000..082e171
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
@@ -0,0 +1,969 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * RegionStates contains a set of Maps that describes the in-memory state of 
the AM, with
+ * the regions available in the system, the region in transition, the offline 
regions and
+ * the servers holding regions.
+ */
+@InterfaceAudience.Private
+public class RegionStates {
+  private static final Log LOG = LogFactory.getLog(RegionStates.class);
+
+  protected static final State[] STATES_EXPECTED_ON_OPEN = new State[] {
+    State.OFFLINE, State.CLOSED,      // disable/offline
+    State.SPLITTING, State.SPLIT,     // ServerCrashProcedure
+    State.OPENING, State.FAILED_OPEN, // already in-progress (retrying)
+  };
+
+  protected static final State[] STATES_EXPECTED_ON_CLOSE = new State[] {
+    State.SPLITTING, State.SPLIT, // ServerCrashProcedure
+    State.OPEN,                   // enabled/open
+    State.CLOSING                 // already in-progress (retrying)
+  };
+
+  private static class AssignmentProcedureEvent extends 
ProcedureEvent<HRegionInfo> {
+    public AssignmentProcedureEvent(final HRegionInfo regionInfo) {
+      super(regionInfo);
+    }
+  }
+
+  private static class ServerReportEvent extends ProcedureEvent<ServerName> {
+    public ServerReportEvent(final ServerName serverName) {
+      super(serverName);
+    }
+  }
+
+  /**
+   * Current Region State.
+   * In-memory only. Not persisted.
+   */
+  // Mutable/Immutable? Changes have to be synchronized or not?
+  // Data members are volatile which seems to say multi-threaded access is 
fine.
+  // In the below we do check and set but the check state could change before
+  // we do the set because no synchronization....which seems dodgy. Clear up
+  // understanding here... how many threads accessing? Do locks make it so one
+  // thread at a time working on a single Region's RegionStateNode? Lets 
presume
+  // so for now. Odd is that elsewhere in this RegionStates, we synchronize on
+  // the RegionStateNode instance. TODO.
+  public static class RegionStateNode implements Comparable<RegionStateNode> {
+    private final HRegionInfo regionInfo;
+    private final ProcedureEvent<?> event;
+
+    private volatile RegionTransitionProcedure procedure = null;
+    private volatile ServerName regionLocation = null;
+    private volatile ServerName lastHost = null;
+    /**
+     * A Region-in-Transition (RIT) moves through states.
+     * See {@link State} for complete list. A Region that
+     * is opened moves from OFFLINE => OPENING => OPENED.
+     */
+    private volatile State state = State.OFFLINE;
+
+    /**
+     * Updated whenever a call to {@link #setRegionLocation(ServerName)}
+     * or {@link #setState(State, State...)}.
+     */
+    private volatile long lastUpdate = 0;
+
+    private volatile long openSeqNum = HConstants.NO_SEQNUM;
+
+    public RegionStateNode(final HRegionInfo regionInfo) {
+      this.regionInfo = regionInfo;
+      this.event = new AssignmentProcedureEvent(regionInfo);
+    }
+
+    public boolean setState(final State update, final State... expected) {
+      final boolean expectedState = isInState(expected);
+      if (expectedState) {
+        this.state = update;
+        this.lastUpdate = EnvironmentEdgeManager.currentTime();
+      }
+      return expectedState;
+    }
+
+    /**
+     * Put region into OFFLINE mode (set state and clear location).
+     * @return Last recorded server deploy
+     */
+    public ServerName offline() {
+      setState(State.OFFLINE);
+      return setRegionLocation(null);
+    }
+
+    /**
+     * Set new {@link State} but only if currently in <code>expected</code> 
State
+     * (if not, throw {@link UnexpectedStateException}.
+     */
+    public State transitionState(final State update, final State... expected)
+    throws UnexpectedStateException {
+      if (!setState(update, expected)) {
+        throw new UnexpectedStateException("Expected " + 
Arrays.toString(expected) +
+          " so could move to " + update + " but current state=" + getState());
+      }
+      return update;
+    }
+
+    public boolean isInState(final State... expected) {
+      if (expected != null && expected.length > 0) {
+        boolean expectedState = false;
+        for (int i = 0; i < expected.length; ++i) {
+          expectedState |= (getState() == expected[i]);
+        }
+        return expectedState;
+      }
+      return true;
+    }
+
+    public boolean isStuck() {
+      return isInState(State.FAILED_OPEN) && getProcedure() != null;
+    }
+
+    public boolean isInTransition() {
+      return getProcedure() != null;
+    }
+
+    public long getLastUpdate() {
+      return procedure != null ? procedure.getLastUpdate() : lastUpdate;
+    }
+
+    public void setLastHost(final ServerName serverName) {
+      this.lastHost = serverName;
+    }
+
+    public void setOpenSeqNum(final long seqId) {
+      this.openSeqNum = seqId;
+    }
+
+    
+    public ServerName setRegionLocation(final ServerName serverName) {
+      ServerName lastRegionLocation = this.regionLocation;
+      if (LOG.isTraceEnabled() && serverName == null) {
+        LOG.trace("Tracking when we are set to null " + this, new 
Throwable("TRACE"));
+      }
+      this.regionLocation = serverName;
+      this.lastUpdate = EnvironmentEdgeManager.currentTime();
+      return lastRegionLocation;
+    }
+
+    public boolean setProcedure(final RegionTransitionProcedure proc) {
+      if (this.procedure != null && this.procedure != proc) {
+        return false;
+      }
+      this.procedure = proc;
+      return true;
+    }
+
+    public boolean unsetProcedure(final RegionTransitionProcedure proc) {
+      if (this.procedure != null && this.procedure != proc) {
+        return false;
+      }
+      this.procedure = null;
+      return true;
+    }
+
+    public RegionTransitionProcedure getProcedure() {
+      return procedure;
+    }
+
+    public ProcedureEvent<?> getProcedureEvent() {
+      return event;
+    }
+
+    public HRegionInfo getRegionInfo() {
+      return regionInfo;
+    }
+
+    public TableName getTable() {
+      return getRegionInfo().getTable();
+    }
+
+    public boolean isSystemTable() {
+      return getTable().isSystemTable();
+    }
+
+    public ServerName getLastHost() {
+      return lastHost;
+    }
+
+    public ServerName getRegionLocation() {
+      return regionLocation;
+    }
+
+    public State getState() {
+      return state;
+    }
+
+    public long getOpenSeqNum() {
+      return openSeqNum;
+    }
+
+    public int getFormatVersion() {
+      // we don't have any format for now
+      // it should probably be in regionInfo.getFormatVersion()
+      return 0;
+    }
+
+    @Override
+    public int compareTo(final RegionStateNode other) {
+      // NOTE: HRegionInfo sort by table first, so we are relying on that.
+      // we have a TestRegionState#testOrderedByTable() that check for that.
+      return getRegionInfo().compareTo(other.getRegionInfo());
+    }
+
+    @Override
+    public int hashCode() {
+      return getRegionInfo().hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+      if (this == other) return true;
+      if (!(other instanceof RegionStateNode)) return false;
+      return compareTo((RegionStateNode)other) == 0;
+    }
+
+    @Override
+    public String toString() {
+      return toDescriptiveString();
+    }
+ 
+    public String toShortString() {
+      // rit= is the current Region-In-Transition State -- see State enum.
+      return String.format("rit=%s, location=%s", getState(), 
getRegionLocation());
+    }
+
+    public String toDescriptiveString() {
+      return String.format("%s, table=%s, region=%s",
+        toShortString(), getTable(), getRegionInfo().getEncodedName());
+    }
+  }
+
+  // This comparator sorts the RegionStates by time stamp then Region name.
+  // Comparing by timestamp alone can lead us to discard different 
RegionStates that happen
+  // to share a timestamp.
+  private static class RegionStateStampComparator implements 
Comparator<RegionState> {
+    @Override
+    public int compare(final RegionState l, final RegionState r) {
+      int stampCmp = Long.compare(l.getStamp(), r.getStamp());
+      return stampCmp != 0 ? stampCmp : l.getRegion().compareTo(r.getRegion());
+    }
+  }
+
+  public enum ServerState { ONLINE, SPLITTING, OFFLINE }
+  public static class ServerStateNode implements Comparable<ServerStateNode> {
+    private final ServerReportEvent reportEvent;
+
+    private final Set<RegionStateNode> regions;
+    private final ServerName serverName;
+
+    private volatile ServerState state = ServerState.ONLINE;
+    private volatile int versionNumber = 0;
+
+    public ServerStateNode(final ServerName serverName) {
+      this.serverName = serverName;
+      this.regions = new HashSet<RegionStateNode>();
+      this.reportEvent = new ServerReportEvent(serverName);
+    }
+
+    public ServerName getServerName() {
+      return serverName;
+    }
+
+    public ServerState getState() {
+      return state;
+    }
+
+    public int getVersionNumber() {
+      return versionNumber;
+    }
+
+    public ProcedureEvent<?> getReportEvent() {
+      return reportEvent;
+    }
+
+    public boolean isInState(final ServerState... expected) {
+      boolean expectedState = false;
+      if (expected != null) {
+        for (int i = 0; i < expected.length; ++i) {
+          expectedState |= (state == expected[i]);
+        }
+      }
+      return expectedState;
+    }
+
+    public void setState(final ServerState state) {
+      this.state = state;
+    }
+
+    public void setVersionNumber(final int versionNumber) {
+      this.versionNumber = versionNumber;
+    }
+
+    public Set<RegionStateNode> getRegions() {
+      return regions;
+    }
+
+    public int getRegionCount() {
+      return regions.size();
+    }
+
+    public ArrayList<HRegionInfo> getRegionInfoList() {
+      ArrayList<HRegionInfo> hris = new ArrayList<HRegionInfo>(regions.size());
+      for (RegionStateNode region: regions) {
+        hris.add(region.getRegionInfo());
+      }
+      return hris;
+    }
+
+    public void addRegion(final RegionStateNode regionNode) {
+      this.regions.add(regionNode);
+    }
+
+    public void removeRegion(final RegionStateNode regionNode) {
+      this.regions.remove(regionNode);
+    }
+
+    @Override
+    public int compareTo(final ServerStateNode other) {
+      return getServerName().compareTo(other.getServerName());
+    }
+
+    @Override
+    public int hashCode() {
+      return getServerName().hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+      if (this == other) return true;
+      if (!(other instanceof ServerStateNode)) return false;
+      return compareTo((ServerStateNode)other) == 0;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("ServerStateNode(%s)", getServerName());
+    }
+  }
+
+  public final static RegionStateStampComparator REGION_STATE_STAMP_COMPARATOR 
=
+      new RegionStateStampComparator();
+
+  // TODO: Replace the ConcurrentSkipListMaps
+  /**
+   * RegionName -- i.e. HRegionInfo.getRegionName() -- as bytes to {@link 
RegionStateNode}
+   */
+  private final ConcurrentSkipListMap<byte[], RegionStateNode> regionsMap =
+      new ConcurrentSkipListMap<byte[], 
RegionStateNode>(Bytes.BYTES_COMPARATOR);
+
+  private final ConcurrentSkipListMap<HRegionInfo, RegionStateNode> 
regionInTransition =
+    new ConcurrentSkipListMap<HRegionInfo, RegionStateNode>();
+
+  /**
+   * Regions marked as offline on a read of hbase:meta. Unused or at least, 
once
+   * offlined, regions have no means of coming on line again. TODO.
+   */
+  private final ConcurrentSkipListMap<HRegionInfo, RegionStateNode> 
regionOffline =
+    new ConcurrentSkipListMap<HRegionInfo, RegionStateNode>();
+
+  private final ConcurrentSkipListMap<byte[], RegionFailedOpen> 
regionFailedOpen =
+    new ConcurrentSkipListMap<byte[], 
RegionFailedOpen>(Bytes.BYTES_COMPARATOR);
+
+  private final ConcurrentHashMap<ServerName, ServerStateNode> serverMap =
+      new ConcurrentHashMap<ServerName, ServerStateNode>();
+
+  public RegionStates() { }
+
+  public void clear() {
+    regionsMap.clear();
+    regionInTransition.clear();
+    regionOffline.clear();
+    serverMap.clear();
+  }
+
+  // ==========================================================================
+  //  RegionStateNode helpers
+  // ==========================================================================
+  protected RegionStateNode createRegionNode(final HRegionInfo regionInfo) {
+    RegionStateNode newNode = new RegionStateNode(regionInfo);
+    RegionStateNode oldNode = 
regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode);
+    return oldNode != null ? oldNode : newNode;
+  }
+
+  protected RegionStateNode getOrCreateRegionNode(final HRegionInfo 
regionInfo) {
+    RegionStateNode node = regionsMap.get(regionInfo.getRegionName());
+    return node != null ? node : createRegionNode(regionInfo);
+  }
+
+  RegionStateNode getRegionNodeFromName(final byte[] regionName) {
+    return regionsMap.get(regionName);
+  }
+
+  protected RegionStateNode getRegionNode(final HRegionInfo regionInfo) {
+    return getRegionNodeFromName(regionInfo.getRegionName());
+  }
+
+  RegionStateNode getRegionNodeFromEncodedName(final String encodedRegionName) 
{
+    // TODO: Need a map <encodedName, ...> but it is just dispatch merge...
+    for (RegionStateNode node: regionsMap.values()) {
+      if (node.getRegionInfo().getEncodedName().equals(encodedRegionName)) {
+        return node;
+      }
+    }
+    return null;
+  }
+
+  public void deleteRegion(final HRegionInfo regionInfo) {
+    regionsMap.remove(regionInfo.getRegionName());
+    // Remove from the offline regions map too if there.
+    if (this.regionOffline.containsKey(regionInfo)) {
+      if (LOG.isTraceEnabled()) LOG.trace("Removing from regionOffline Map: " 
+ regionInfo);
+      this.regionOffline.remove(regionInfo);
+    }
+  }
+
+  ArrayList<RegionStateNode> getTableRegionStateNodes(final TableName 
tableName) {
+    final ArrayList<RegionStateNode> regions = new 
ArrayList<RegionStateNode>();
+    for (RegionStateNode node: 
regionsMap.tailMap(tableName.getName()).values()) {
+      if (!node.getTable().equals(tableName)) break;
+      regions.add(node);
+    }
+    return regions;
+  }
+
+  ArrayList<RegionState> getTableRegionStates(final TableName tableName) {
+    final ArrayList<RegionState> regions = new ArrayList<RegionState>();
+    for (RegionStateNode node: 
regionsMap.tailMap(tableName.getName()).values()) {
+      if (!node.getTable().equals(tableName)) break;
+      regions.add(createRegionState(node));
+    }
+    return regions;
+  }
+
+  ArrayList<HRegionInfo> getTableRegionsInfo(final TableName tableName) {
+    final ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
+    for (RegionStateNode node: 
regionsMap.tailMap(tableName.getName()).values()) {
+      if (!node.getTable().equals(tableName)) break;
+      regions.add(node.getRegionInfo());
+    }
+    return regions;
+  }
+
+  Collection<RegionStateNode> getRegionNodes() {
+    return regionsMap.values();
+  }
+
+  public ArrayList<RegionState> getRegionStates() {
+    final ArrayList<RegionState> regions = new 
ArrayList<RegionState>(regionsMap.size());
+    for (RegionStateNode node: regionsMap.values()) {
+      regions.add(createRegionState(node));
+    }
+    return regions;
+  }
+
+  // ==========================================================================
+  //  RegionState helpers
+  // ==========================================================================
+  public RegionState getRegionState(final HRegionInfo regionInfo) {
+    return createRegionState(getRegionNode(regionInfo));
+  }
+
+  public RegionState getRegionState(final String encodedRegionName) {
+    return createRegionState(getRegionNodeFromEncodedName(encodedRegionName));
+  }
+
+  private RegionState createRegionState(final RegionStateNode node) {
+    return node == null ? null :
+      new RegionState(node.getRegionInfo(), node.getState(),
+        node.getLastUpdate(), node.getRegionLocation());
+  }
+
+  // 
============================================================================================
+  //  TODO: helpers
+  // 
============================================================================================
+  public boolean hasTableRegionStates(final TableName tableName) {
+    // TODO
+    return !getTableRegionStates(tableName).isEmpty();
+  }
+
+  public List<HRegionInfo> getRegionsOfTable(final TableName table) {
+    return getRegionsOfTable(table, false);
+  }
+
+  List<HRegionInfo> getRegionsOfTable(final TableName table, final boolean 
offline) {
+    final ArrayList<RegionStateNode> nodes = getTableRegionStateNodes(table);
+    final ArrayList<HRegionInfo> hris = new 
ArrayList<HRegionInfo>(nodes.size());
+    for (RegionStateNode node: nodes) {
+      if (include(node, offline)) hris.add(node.getRegionInfo());
+    }
+    return hris;
+  }
+
+  /**
+   * Utility. Whether to include region in list of regions. Default is to
+   * weed out split and offline regions.
+   * @return True if we should include the <code>node</code> (do not include
+   * if split or offline unless <code>offline</code> is set to true.
+   */
+  boolean include(final RegionStateNode node, final boolean offline) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("WORKING ON " + node + " " + node.getRegionInfo());
+    }
+    if (node.isInState(State.SPLIT)) return false;
+    if (node.isInState(State.OFFLINE) && !offline) return false;
+    final HRegionInfo hri = node.getRegionInfo();
+    return (!hri.isOffline() && !hri.isSplit()) ||
+        ((hri.isOffline() || hri.isSplit()) && offline);
+  }
+
+  /**
+   * Returns the set of regions hosted by the specified server
+   * @param serverName the server we are interested in
+   * @return set of HRegionInfo hosted by the specified server
+   */
+  public List<HRegionInfo> getServerRegionInfoSet(final ServerName serverName) 
{
+    final ServerStateNode serverInfo = getServerNode(serverName);
+    if (serverInfo == null) return Collections.emptyList();
+
+    synchronized (serverInfo) {
+      return serverInfo.getRegionInfoList();
+    }
+  }
+
+  // 
============================================================================================
+  //  TODO: split helpers
+  // 
============================================================================================
+  public void logSplit(final ServerName serverName) {
+    final ServerStateNode serverNode = getOrCreateServer(serverName);
+    synchronized (serverNode) {
+      serverNode.setState(ServerState.SPLITTING);
+      /* THIS HAS TO BE WRONG. THIS IS SPLITTING OF REGION, NOT SPLITTING WALs.
+      for (RegionStateNode regionNode: serverNode.getRegions()) {
+        synchronized (regionNode) {
+          // TODO: Abort procedure if present
+          regionNode.setState(State.SPLITTING);
+        }
+      }*/
+    }
+  }
+
+  public void logSplit(final HRegionInfo regionInfo) {
+    final RegionStateNode regionNode = getRegionNode(regionInfo);
+    synchronized (regionNode) {
+      regionNode.setState(State.SPLIT);
+    }
+  }
+
+  @VisibleForTesting
+  public void updateRegionState(final HRegionInfo regionInfo, final State 
state) {
+    final RegionStateNode regionNode = getOrCreateRegionNode(regionInfo);
+    synchronized (regionNode) {
+      regionNode.setState(state);
+    }
+  }
+
+  // 
============================================================================================
+  //  TODO:
+  // 
============================================================================================
+  public List<HRegionInfo> getAssignedRegions() {
+    final List<HRegionInfo> result = new ArrayList<HRegionInfo>();
+    for (RegionStateNode node: regionsMap.values()) {
+      if (!node.isInTransition()) {
+        result.add(node.getRegionInfo());
+      }
+    }
+    return result;
+  }
+
+  public boolean isRegionInState(final HRegionInfo regionInfo, final State... 
state) {
+    final RegionStateNode region = getRegionNode(regionInfo);
+    if (region != null) {
+      synchronized (region) {
+        return region.isInState(state);
+      }
+    }
+    return false;
+  }
+
+  public boolean isRegionOnline(final HRegionInfo regionInfo) {
+    return isRegionInState(regionInfo, State.OPEN);
+  }
+
+  /**
+   * @return True if region is offline (In OFFLINE or CLOSED state).
+   */
+  public boolean isRegionOffline(final HRegionInfo regionInfo) {
+    return isRegionInState(regionInfo, State.OFFLINE, State.CLOSED);
+  }
+
+  public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment(
+      final Collection<HRegionInfo> regions) {
+    final Map<ServerName, List<HRegionInfo>> result = new HashMap<ServerName, 
List<HRegionInfo>>();
+    for (HRegionInfo hri: regions) {
+      final RegionStateNode node = getRegionNode(hri);
+      if (node == null) continue;
+
+      // TODO: State.OPEN
+      final ServerName serverName = node.getRegionLocation();
+      if (serverName == null) continue;
+
+      List<HRegionInfo> serverRegions = result.get(serverName);
+      if (serverRegions == null) {
+        serverRegions = new ArrayList<HRegionInfo>();
+        result.put(serverName, serverRegions);
+      }
+
+      serverRegions.add(node.getRegionInfo());
+    }
+    return result;
+  }
+
+  public Map<HRegionInfo, ServerName> getRegionAssignments() {
+    final HashMap<HRegionInfo, ServerName> assignments = new 
HashMap<HRegionInfo, ServerName>();
+    for (RegionStateNode node: regionsMap.values()) {
+      assignments.put(node.getRegionInfo(), node.getRegionLocation());
+    }
+    return assignments;
+  }
+
+  public Map<RegionState.State, List<HRegionInfo>> 
getRegionByStateOfTable(TableName tableName) {
+    final State[] states = State.values();
+    final Map<RegionState.State, List<HRegionInfo>> tableRegions =
+        new HashMap<State, List<HRegionInfo>>(states.length);
+    for (int i = 0; i < states.length; ++i) {
+      tableRegions.put(states[i], new ArrayList<HRegionInfo>());
+    }
+
+    for (RegionStateNode node: regionsMap.values()) {
+      tableRegions.get(node.getState()).add(node.getRegionInfo());
+    }
+    return tableRegions;
+  }
+
+  public ServerName getRegionServerOfRegion(final HRegionInfo regionInfo) {
+    final RegionStateNode region = getRegionNode(regionInfo);
+    if (region != null) {
+      synchronized (region) {
+        ServerName server = region.getRegionLocation();
+        return server != null ? server : region.getLastHost();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
+   * Can't let out original since it can change and at least the load balancer
+   * wants to iterate this exported list.  We need to synchronize on regions
+   * since all access to this.servers is under a lock on this.regions.
+   * @param forceByCluster a flag to force to aggregate the server-load to the 
cluster level
+   * @return A clone of current assignments by table.
+   */
+  public Map<TableName, Map<ServerName, List<HRegionInfo>>> 
getAssignmentsByTable(
+      final boolean forceByCluster) {
+    if (!forceByCluster) return getAssignmentsByTable();
+
+    final HashMap<ServerName, List<HRegionInfo>> ensemble =
+      new HashMap<ServerName, List<HRegionInfo>>(serverMap.size());
+    for (ServerStateNode serverNode: serverMap.values()) {
+      ensemble.put(serverNode.getServerName(), serverNode.getRegionInfoList());
+    }
+
+    // TODO: can we use 
Collections.singletonMap(HConstants.ENSEMBLE_TABLE_NAME, ensemble)?
+    final Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
+      new HashMap<TableName, Map<ServerName, List<HRegionInfo>>>(1);
+    result.put(HConstants.ENSEMBLE_TABLE_NAME, ensemble);
+    return result;
+  }
+
+  public Map<TableName, Map<ServerName, List<HRegionInfo>>> 
getAssignmentsByTable() {
+    final Map<TableName, Map<ServerName, List<HRegionInfo>>> result = new 
HashMap<>();
+    for (RegionStateNode node: regionsMap.values()) {
+      Map<ServerName, List<HRegionInfo>> tableResult = 
result.get(node.getTable());
+      if (tableResult == null) {
+        tableResult = new HashMap<ServerName, List<HRegionInfo>>();
+        result.put(node.getTable(), tableResult);
+      }
+
+      final ServerName serverName = node.getRegionLocation();
+      if (serverName == null) {
+        LOG.info("Skipping, no server for " + node);
+        continue;
+      }
+      List<HRegionInfo> serverResult = tableResult.get(serverName);
+      if (serverResult == null) {
+        serverResult = new ArrayList<HRegionInfo>();
+        tableResult.put(serverName, serverResult);
+      }
+
+      serverResult.add(node.getRegionInfo());
+    }
+    return result;
+  }
+
+  // ==========================================================================
+  //  Region in transition helpers
+  // ==========================================================================
+  protected boolean addRegionInTransition(final RegionStateNode regionNode,
+      final RegionTransitionProcedure procedure) {
+    if (procedure != null && !regionNode.setProcedure(procedure)) return false;
+
+    regionInTransition.put(regionNode.getRegionInfo(), regionNode);
+    return true;
+  }
+
+  protected void removeRegionInTransition(final RegionStateNode regionNode,
+      final RegionTransitionProcedure procedure) {
+    regionInTransition.remove(regionNode.getRegionInfo());
+    regionNode.unsetProcedure(procedure);
+  }
+
+  public boolean hasRegionsInTransition() {
+    return !regionInTransition.isEmpty();
+  }
+
+  public boolean isRegionInTransition(final HRegionInfo regionInfo) {
+    final RegionStateNode node = regionInTransition.get(regionInfo);
+    return node != null ? node.isInTransition() : false;
+  }
+
+  /**
+   * @return If a procedure-in-transition for <code>hri</code>, return it else 
null.
+   */
+  public RegionTransitionProcedure getRegionTransitionProcedure(final 
HRegionInfo hri) {
+    RegionStateNode node = regionInTransition.get(hri);
+    if (node == null) return null;
+    return node.getProcedure();
+  }
+
+  public RegionState getRegionTransitionState(final HRegionInfo hri) {
+    RegionStateNode node = regionInTransition.get(hri);
+    if (node == null) return null;
+
+    synchronized (node) {
+      return node.isInTransition() ? createRegionState(node) : null;
+    }
+  }
+
+  public List<RegionStateNode> getRegionsInTransition() {
+    return new ArrayList<RegionStateNode>(regionInTransition.values());
+  }
+
+  /**
+   * Get the number of regions in transition.
+   */
+  public int getRegionsInTransitionCount() {
+    return regionInTransition.size();
+  }
+
+  public List<RegionState> getRegionsStateInTransition() {
+    final List<RegionState> rit = new 
ArrayList<RegionState>(regionInTransition.size());
+    for (RegionStateNode node: regionInTransition.values()) {
+      rit.add(createRegionState(node));
+    }
+    return rit;
+  }
+
+  public SortedSet<RegionState> getRegionsInTransitionOrderedByTimestamp() {
+    final SortedSet<RegionState> rit = new 
TreeSet<RegionState>(REGION_STATE_STAMP_COMPARATOR);
+    for (RegionStateNode node: regionInTransition.values()) {
+      rit.add(createRegionState(node));
+    }
+    return rit;
+  }
+
+  // ==========================================================================
+  //  Region offline helpers
+  // ==========================================================================
+  // TODO: Populated when we read meta but regions never make it out of here.
+  public void addToOfflineRegions(final RegionStateNode regionNode) {
+    LOG.info("Added to offline, CURRENTLY NEVER CLEARED!!! " + regionNode);
+    regionOffline.put(regionNode.getRegionInfo(), regionNode);
+  }
+
+  // TODO: Unused.
+  public void removeFromOfflineRegions(final HRegionInfo regionInfo) {
+    regionOffline.remove(regionInfo);
+  }
+
+  // ==========================================================================
+  //  Region FAIL_OPEN helpers
+  // ==========================================================================
+  public static final class RegionFailedOpen {
+    private final RegionStateNode regionNode;
+
+    private volatile Exception exception = null;
+    private volatile int retries = 0;
+
+    public RegionFailedOpen(final RegionStateNode regionNode) {
+      this.regionNode = regionNode;
+    }
+
+    public RegionStateNode getRegionNode() {
+      return regionNode;
+    }
+
+    public HRegionInfo getRegionInfo() {
+      return regionNode.getRegionInfo();
+    }
+
+    public int incrementAndGetRetries() {
+      return ++this.retries;
+    }
+
+    public int getRetries() {
+      return retries;
+    }
+
+    public void setException(final Exception exception) {
+      this.exception = exception;
+    }
+
+    public Exception getException() {
+      return this.exception;
+    }
+  }
+
+  public RegionFailedOpen addToFailedOpen(final RegionStateNode regionNode) {
+    final byte[] key = regionNode.getRegionInfo().getRegionName();
+    RegionFailedOpen node = regionFailedOpen.get(key);
+    if (node == null) {
+      RegionFailedOpen newNode = new RegionFailedOpen(regionNode);
+      RegionFailedOpen oldNode = regionFailedOpen.putIfAbsent(key, newNode);
+      node = oldNode != null ? oldNode : newNode;
+    }
+    return node;
+  }
+
+  public RegionFailedOpen getFailedOpen(final HRegionInfo regionInfo) {
+    return regionFailedOpen.get(regionInfo.getRegionName());
+  }
+
+  public void removeFromFailedOpen(final HRegionInfo regionInfo) {
+    regionFailedOpen.remove(regionInfo.getRegionName());
+  }
+
+  public List<RegionState> getRegionFailedOpen() {
+    if (regionFailedOpen.isEmpty()) return Collections.emptyList();
+
+    ArrayList<RegionState> regions = new 
ArrayList<RegionState>(regionFailedOpen.size());
+    for (RegionFailedOpen r: regionFailedOpen.values()) {
+      regions.add(createRegionState(r.getRegionNode()));
+    }
+    return regions;
+  }
+
+  // ==========================================================================
+  //  Servers
+  // ==========================================================================
+  public ServerStateNode getOrCreateServer(final ServerName serverName) {
+    ServerStateNode node = serverMap.get(serverName);
+    if (node == null) {
+      node = new ServerStateNode(serverName);
+      ServerStateNode oldNode = serverMap.putIfAbsent(serverName, node);
+      node = oldNode != null ? oldNode : node;
+    }
+    return node;
+  }
+
+  public void removeServer(final ServerName serverName) {
+    serverMap.remove(serverName);
+  }
+
+  protected ServerStateNode getServerNode(final ServerName serverName) {
+    return serverMap.get(serverName);
+  }
+
+  public double getAverageLoad() {
+    int numServers = 0;
+    int totalLoad = 0;
+    for (ServerStateNode node: serverMap.values()) {
+      totalLoad += node.getRegionCount();
+      numServers++;
+    }
+    return numServers == 0 ? 0.0: (double)totalLoad / (double)numServers;
+  }
+
+  public ServerStateNode addRegionToServer(final ServerName serverName,
+      final RegionStateNode regionNode) {
+    ServerStateNode serverNode = getOrCreateServer(serverName);
+    serverNode.addRegion(regionNode);
+    return serverNode;
+  }
+
+  public ServerStateNode removeRegionFromServer(final ServerName serverName,
+      final RegionStateNode regionNode) {
+    ServerStateNode serverNode = getOrCreateServer(serverName);
+    serverNode.removeRegion(regionNode);
+    return serverNode;
+  }
+
+  // ==========================================================================
+  //  ToString helpers
+  // ==========================================================================
+  public static String regionNamesToString(final Collection<byte[]> regions) {
+    final StringBuilder sb = new StringBuilder();
+    final Iterator<byte[]> it = regions.iterator();
+    sb.append("[");
+    if (it.hasNext()) {
+      sb.append(Bytes.toStringBinary(it.next()));
+      while (it.hasNext()) {
+        sb.append(", ");
+        sb.append(Bytes.toStringBinary(it.next()));
+      }
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
new file mode 100644
index 0000000..49124ea
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -0,0 +1,381 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+
+/**
+ * Base class for the Assign and Unassign Procedure.
+ * There can only be one RegionTransitionProcedure per region running at a time
+ * since each procedure takes a lock on the region (see 
MasterProcedureScheduler).
+ *
+ * <p>This procedure is asynchronous and responds to external events.
+ * The AssignmentManager will notify this procedure when the RS completes
+ * the operation and reports the transitioned state
+ * (see the Assign and Unassign class for more detail).
+ * <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are
+ * first submitted, to the REGION_TRANSITION_DISPATCH state when the request
+ * to remote server is sent and the Procedure is suspended waiting on external
+ * event to be woken again. Once the external event is triggered, Procedure
+ * moves to the REGION_TRANSITION_FINISH state.
+ */
+@InterfaceAudience.Private
+public abstract class RegionTransitionProcedure
+    extends Procedure<MasterProcedureEnv>
+    implements TableProcedureInterface,
+      RemoteProcedure<MasterProcedureEnv, ServerName> {
+  private static final Log LOG = 
LogFactory.getLog(RegionTransitionProcedure.class);
+
+  protected final AtomicBoolean aborted = new AtomicBoolean(false);
+
+  private RegionTransitionState transitionState =
+      RegionTransitionState.REGION_TRANSITION_QUEUE;
+  private HRegionInfo regionInfo;
+  private volatile boolean lock = false;
+
+  public RegionTransitionProcedure() {
+    // Required by the Procedure framework to create the procedure on replay
+    super();
+  }
+
+  public RegionTransitionProcedure(final HRegionInfo regionInfo) {
+    this.regionInfo = regionInfo;
+  }
+
+  public HRegionInfo getRegionInfo() {
+    return regionInfo;
+  }
+
+  protected void setRegionInfo(final HRegionInfo regionInfo) {
+    // Setter is for deserialization.
+    this.regionInfo = regionInfo;
+  }
+
+  @Override
+  public TableName getTableName() {
+    HRegionInfo hri = getRegionInfo();
+    return hri != null? hri.getTable(): null;
+  }
+
+  public boolean isMeta() {
+    return TableName.isMetaTableName(getTableName());
+  }
+
+  @Override
+  public void toStringClassDetails(final StringBuilder sb) {
+    sb.append(getClass().getSimpleName());
+    sb.append(" table=");
+    sb.append(getTableName());
+    sb.append(", region=");
+    sb.append(getRegionInfo() == null? null: getRegionInfo().getEncodedName());
+  }
+
+  public RegionStateNode getRegionState(final MasterProcedureEnv env) {
+    return env.getAssignmentManager().getRegionStates().
+        getOrCreateRegionNode(getRegionInfo());
+  }
+
+  protected void setTransitionState(final RegionTransitionState state) {
+    this.transitionState = state;
+  }
+
+  protected RegionTransitionState getTransitionState() {
+    return transitionState;
+  }
+
+  protected abstract boolean startTransition(MasterProcedureEnv env, 
RegionStateNode regionNode)
+    throws IOException, ProcedureSuspendedException;
+
+  /**
+   * Called when the Procedure is in the REGION_TRANSITION_DISPATCH state.
+   * In here we do the RPC call to OPEN/CLOSE the region. The suspending of
+   * the thread so it sleeps until it gets update that the OPEN/CLOSE has
+   * succeeded is complicated. Read the implementations to learn more.
+   */
+  protected abstract boolean updateTransition(MasterProcedureEnv env, 
RegionStateNode regionNode)
+    throws IOException, ProcedureSuspendedException;
+
+  protected abstract void finishTransition(MasterProcedureEnv env, 
RegionStateNode regionNode)
+    throws IOException, ProcedureSuspendedException;
+
+  protected abstract void reportTransition(MasterProcedureEnv env,
+      RegionStateNode regionNode, TransitionCode code, long seqId) throws 
UnexpectedStateException;
+
+  public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, 
ServerName serverName);
+  protected abstract void remoteCallFailed(MasterProcedureEnv env,
+      RegionStateNode regionNode, IOException exception);
+
+  @Override
+  public void remoteCallCompleted(final MasterProcedureEnv env,
+      final ServerName serverName, final RemoteOperation response) {
+    // Ignore the response? reportTransition() is the one that count?
+  }
+
+  @Override
+  public void remoteCallFailed(final MasterProcedureEnv env,
+      final ServerName serverName, final IOException exception) {
+    final RegionStateNode regionNode = getRegionState(env);
+    assert serverName.equals(regionNode.getRegionLocation());
+    String msg = exception.getMessage() == null? 
exception.getClass().getSimpleName():
+      exception.getMessage();
+    LOG.warn("Failed " + this + "; " + regionNode.toShortString() + "; 
exception=" + msg);
+    remoteCallFailed(env, regionNode, exception);
+    // NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
+    // Thereafter, another Worker can be in here so DO NOT MESS WITH STATE 
beyond
+    // this method. Just get out of this current processing quickly.
+    env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
+  }
+
+  /**
+   * Be careful! At the end of this method, the procedure has either succeeded
+   * and this procedure has been set into a suspended state OR, we failed and
+   * this procedure has been put back on the scheduler ready for another worker
+   * to pick it up. In both cases, we need to exit the current Worker 
processing
+   * toute de suite!
+   * @return True if we successfully dispatched the call and false if we 
failed;
+   * if failed, we need to roll back any setup done for the dispatch.
+   */
+  protected boolean addToRemoteDispatcher(final MasterProcedureEnv env,
+      final ServerName targetServer) {
+    assert targetServer.equals(getRegionState(env).getRegionLocation()) :
+      "targetServer=" + targetServer + " getRegionLocation=" +
+        getRegionState(env).getRegionLocation(); // TODO
+
+    LOG.info("Dispatch " + this + "; " + getRegionState(env).toShortString());
+
+    // Put this procedure into suspended mode to wait on report of state change
+    // from remote regionserver. Means Procedure associated ProcedureEvent is 
marked not 'ready'.
+    
env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent());
+
+    // Tricky because this can fail. If it fails need to backtrack on stuff 
like
+    // the 'suspend' done above -- tricky as the 'wake' requeues us -- and 
ditto
+    // up in the caller; it needs to undo state changes.
+    if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
+      remoteCallFailed(env, targetServer,
+          new FailedRemoteDispatchException(this + " to " + targetServer));
+      return false;
+    }
+    return true;
+  }
+
+  protected void reportTransition(final MasterProcedureEnv env, final 
ServerName serverName,
+      final TransitionCode code, final long seqId) throws 
UnexpectedStateException {
+    final RegionStateNode regionNode = getRegionState(env);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received report " + code + " seqId=" + seqId + ", " +
+            this + "; " + regionNode.toShortString());
+    }
+    if (!serverName.equals(regionNode.getRegionLocation())) {
+      if (isMeta() && regionNode.getRegionLocation() == null) {
+        regionNode.setRegionLocation(serverName);
+      } else {
+        throw new UnexpectedStateException(String.format(
+          "Unexpected state=%s from server=%s; expected server=%s; %s; %s",
+          code, serverName, regionNode.getRegionLocation(),
+          this, regionNode.toShortString()));
+      }
+    }
+
+    reportTransition(env, regionNode, code, seqId);
+
+    // NOTE: This call adds this procedure back on the scheduler.
+    // This makes it so this procedure can run again. Another worker will take
+    // processing to the next stage. At an extreme, the other worker may run in
+    // parallel so DO  NOT CHANGE any state hereafter! This should be last 
thing
+    // done in this processing step.
+    env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
+  }
+
+  protected boolean isServerOnline(final MasterProcedureEnv env, final 
RegionStateNode regionNode) {
+    return isServerOnline(env, regionNode.getRegionLocation());
+  }
+
+  protected boolean isServerOnline(final MasterProcedureEnv env, final 
ServerName serverName) {
+    return 
env.getMasterServices().getServerManager().isServerOnline(serverName);
+  }
+
+  @Override
+  protected void toStringState(StringBuilder builder) {
+    super.toStringState(builder);
+    RegionTransitionState ts = this.transitionState;
+    if (!isFinished() && ts != null) {
+      builder.append(":").append(ts);
+    }
+  }
+
+  @Override
+  protected Procedure[] execute(final MasterProcedureEnv env) throws 
ProcedureSuspendedException {
+    final AssignmentManager am = env.getAssignmentManager();
+    final RegionStateNode regionNode = getRegionState(env);
+    if (!am.addRegionInTransition(regionNode, this)) {
+      String msg = String.format(
+        "There is already another procedure running on this region this=%s 
owner=%s",
+        this, regionNode.getProcedure());
+      LOG.warn(msg + " " + this + "; " + regionNode.toShortString());
+      setAbortFailure(getClass().getSimpleName(), msg);
+      return null;
+    }
+    try {
+      boolean retry;
+      do {
+        retry = false;
+        switch (transitionState) {
+          case REGION_TRANSITION_QUEUE:
+            // 1. push into the AM queue for balancer policy
+            if (!startTransition(env, regionNode)) {
+              // The operation figured it is done or it aborted; check 
getException()
+              am.removeRegionInTransition(getRegionState(env), this);
+              return null;
+            }
+            transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH;
+            if 
(env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) {
+              // Why this suspend? Because we want to ensure Store happens 
before proceed?
+              throw new ProcedureSuspendedException();
+            }
+            break;
+
+          case REGION_TRANSITION_DISPATCH:
+            // 2. send the request to the target server
+            if (!updateTransition(env, regionNode)) {
+              // The operation figured it is done or it aborted; check 
getException()
+              am.removeRegionInTransition(regionNode, this);
+              return null;
+            }
+            if (transitionState != 
RegionTransitionState.REGION_TRANSITION_DISPATCH) {
+              retry = true;
+              break;
+            }
+            if 
(env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) {
+              throw new ProcedureSuspendedException();
+            }
+            break;
+
+          case REGION_TRANSITION_FINISH:
+            // 3. wait assignment response. completion/failure
+            finishTransition(env, regionNode);
+            am.removeRegionInTransition(regionNode, this);
+            return null;
+        }
+      } while (retry);
+    } catch (IOException e) {
+      LOG.warn("Retryable error trying to transition: " +
+          this + "; " + regionNode.toShortString(), e);
+    }
+
+    return new Procedure[] {this};
+  }
+
+  @Override
+  protected void rollback(final MasterProcedureEnv env) {
+    if (isRollbackSupported(transitionState)) {
+      // Nothing done up to this point. abort safely.
+      // This should happen when something like disableTable() is triggered.
+      env.getAssignmentManager().removeRegionInTransition(getRegionState(env), 
this);
+      return;
+    }
+
+    // There is no rollback for assignment unless we cancel the operation by
+    // dropping/disabling the table.
+    throw new UnsupportedOperationException("Unhandled state " + 
transitionState +
+        "; there is no rollback for assignment unless we cancel the operation 
by " +
+        "dropping/disabling the table");
+  }
+
+  protected abstract boolean isRollbackSupported(final RegionTransitionState 
state);
+
+  @Override
+  protected boolean abort(final MasterProcedureEnv env) {
+    if (isRollbackSupported(transitionState)) {
+      aborted.set(true);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected LockState acquireLock(final MasterProcedureEnv env) {
+    // Unless we are assigning meta, wait for meta to be available and loaded.
+    if (!isMeta() && (env.waitFailoverCleanup(this) ||
+        env.getAssignmentManager().waitMetaInitialized(this, 
getRegionInfo()))) {
+      return LockState.LOCK_EVENT_WAIT;
+    }
+
+    // TODO: Revisit this and move it to the executor
+    if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) {
+      try {
+        LOG.debug(LockState.LOCK_EVENT_WAIT + " pid=" + getProcId() + " " +
+          env.getProcedureScheduler().dumpLocks());
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      return LockState.LOCK_EVENT_WAIT;
+    }
+    this.lock = true;
+    return LockState.LOCK_ACQUIRED;
+  }
+
+  @Override
+  protected void releaseLock(final MasterProcedureEnv env) {
+    env.getProcedureScheduler().wakeRegion(this, getRegionInfo());
+    lock = false;
+  }
+
+  @Override
+  protected boolean holdLock(final MasterProcedureEnv env) {
+    return true;
+  }
+
+  @Override
+  protected boolean hasLock(final MasterProcedureEnv env) {
+    return lock;
+  }
+
+  @Override
+  protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
+    // The operation is triggered internally on the server
+    // the client does not know about this procedure.
+    return false;
+  }
+
+  /**
+   * Used by ServerCrashProcedure to see if this Assign/Unassign needs 
processing.
+   * @return ServerName the Assign or Unassign is going against.
+   */
+  public abstract ServerName getServer(final MasterProcedureEnv env);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
new file mode 100644
index 0000000..a893783
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -0,0 +1,733 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MasterSwitchType;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import 
org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SplitTableRegionState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The procedure to split a region in a table.
+ * Takes lock on the parent region.
+ * It holds the lock for the life of the procedure.
+ */
+@InterfaceAudience.Private
+public class SplitTableRegionProcedure
+    extends AbstractStateMachineRegionProcedure<SplitTableRegionState> {
+  private static final Log LOG = 
LogFactory.getLog(SplitTableRegionProcedure.class);
+  private Boolean traceEnabled = null;
+  private HRegionInfo daughter_1_HRI;
+  private HRegionInfo daughter_2_HRI;
+
+  public SplitTableRegionProcedure() {
+    // Required by the Procedure framework to create the procedure on replay
+  }
+
+  public SplitTableRegionProcedure(final MasterProcedureEnv env,
+      final HRegionInfo regionToSplit, final byte[] splitRow) throws 
IOException {
+    super(env, regionToSplit);
+
+    checkSplitRow(regionToSplit, splitRow);
+
+    final TableName table = regionToSplit.getTable();
+    final long rid = getDaughterRegionIdTimestamp(regionToSplit);
+    this.daughter_1_HRI = new HRegionInfo(table, regionToSplit.getStartKey(), 
splitRow, false, rid);
+    this.daughter_2_HRI = new HRegionInfo(table, splitRow, 
regionToSplit.getEndKey(), false, rid);
+  }
+
+  private static void checkSplitRow(final HRegionInfo regionToSplit, final 
byte[] splitRow)
+      throws IOException {
+    if (splitRow == null || splitRow.length == 0) {
+      throw new DoNotRetryIOException("Split row cannot be null");
+    }
+
+    if (Bytes.equals(regionToSplit.getStartKey(), splitRow)) {
+      throw new DoNotRetryIOException(
+        "Split row is equal to startkey: " + Bytes.toStringBinary(splitRow));
+    }
+
+    if (!regionToSplit.containsRow(splitRow)) {
+      throw new DoNotRetryIOException(
+        "Split row is not inside region key range splitKey:" + 
Bytes.toStringBinary(splitRow) +
+        " region: " + regionToSplit);
+    }
+  }
+
+  /**
+   * Calculate daughter regionid to use.
+   * @param hri Parent {@link HRegionInfo}
+   * @return Daughter region id (timestamp) to use.
+   */
+  private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
+    long rid = EnvironmentEdgeManager.currentTime();
+    // Regionid is timestamp.  Can't be less than that of parent else will 
insert
+    // at wrong location in hbase:meta (See HBASE-710).
+    if (rid < hri.getRegionId()) {
+      LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
+        " but current time here is " + rid);
+      rid = hri.getRegionId() + 1;
+    }
+    return rid;
+  }
+
+  @Override
+  protected Flow executeFromState(final MasterProcedureEnv env, final 
SplitTableRegionState state)
+      throws InterruptedException {
+    if (isTraceEnabled()) {
+      LOG.trace(this + " execute state=" + state);
+    }
+
+    try {
+      switch (state) {
+      case SPLIT_TABLE_REGION_PREPARE:
+        if (prepareSplitRegion(env)) {
+          setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION);
+          break;
+        } else {
+          assert isFailed() : "split region should have an exception here";
+          return Flow.NO_MORE_STATE;
+        }
+      case SPLIT_TABLE_REGION_PRE_OPERATION:
+        preSplitRegion(env);
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSE_PARENT_REGION);
+        break;
+      case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION:
+        addChildProcedure(createUnassignProcedures(env, 
getRegionReplication(env)));
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS);
+        break;
+      case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
+        createDaughterRegions(env);
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR);
+        break;
+      case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR:
+        preSplitRegionBeforePONR(env);
+        setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META);
+        break;
+      case SPLIT_TABLE_REGION_UPDATE_META:
+        updateMetaForDaughterRegions(env);
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR);
+        break;
+      case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
+        preSplitRegionAfterPONR(env);
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS);
+        break;
+      case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
+        addChildProcedure(createAssignProcedures(env, 
getRegionReplication(env)));
+        setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_POST_OPERATION);
+        break;
+      case SPLIT_TABLE_REGION_POST_OPERATION:
+        postSplitRegion(env);
+        return Flow.NO_MORE_STATE;
+      default:
+        throw new UnsupportedOperationException(this + " unhandled state=" + 
state);
+      }
+    } catch (IOException e) {
+      String msg = "Error trying to split region " + 
getParentRegion().getEncodedName() + " in the table "
+          + getTableName() + " (in state=" + state + ")";
+      if (!isRollbackSupported(state)) {
+        // We reach a state that cannot be rolled back. We just need to keep 
retry.
+        LOG.warn(msg, e);
+      } else {
+        LOG.error(msg, e);
+        setFailure(e);
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(final MasterProcedureEnv env, final 
SplitTableRegionState state)
+      throws IOException, InterruptedException {
+    if (isTraceEnabled()) {
+      LOG.trace(this + " rollback state=" + state);
+    }
+
+    try {
+      switch (state) {
+      case SPLIT_TABLE_REGION_POST_OPERATION:
+      case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
+      case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
+      case SPLIT_TABLE_REGION_UPDATE_META:
+        // PONR
+        throw new UnsupportedOperationException(this + " unhandled state=" + 
state);
+      case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR:
+        break;
+      case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
+        // Doing nothing, as re-open parent region would clean up daughter 
region directories.
+        break;
+      case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION:
+        openParentRegion(env);
+        break;
+      case SPLIT_TABLE_REGION_PRE_OPERATION:
+        postRollBackSplitRegion(env);
+        break;
+      case SPLIT_TABLE_REGION_PREPARE:
+        break; // nothing to do
+      default:
+        throw new UnsupportedOperationException(this + " unhandled state=" + 
state);
+      }
+    } catch (IOException e) {
+      // This will be retried. Unless there is a bug in the code,
+      // this should be just a "temporary error" (e.g. network down)
+      LOG.warn("pid=" + getProcId() + " failed rollback attempt step " + state 
+
+          " for splitting the region "
+        + getParentRegion().getEncodedName() + " in table " + getTableName(), 
e);
+      throw e;
+    }
+  }
+
+  /*
+   * Check whether we are in the state that can be rollback
+   */
+  @Override
+  protected boolean isRollbackSupported(final SplitTableRegionState state) {
+    switch (state) {
+      case SPLIT_TABLE_REGION_POST_OPERATION:
+      case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
+      case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
+      case SPLIT_TABLE_REGION_UPDATE_META:
+        // It is not safe to rollback if we reach to these states.
+        return false;
+      default:
+        break;
+    }
+    return true;
+  }
+
+  @Override
+  protected SplitTableRegionState getState(final int stateId) {
+    return SplitTableRegionState.forNumber(stateId);
+  }
+
+  @Override
+  protected int getStateId(final SplitTableRegionState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected SplitTableRegionState getInitialState() {
+    return SplitTableRegionState.SPLIT_TABLE_REGION_PREPARE;
+  }
+
+  @Override
+  public void serializeStateData(final OutputStream stream) throws IOException 
{
+    super.serializeStateData(stream);
+
+    final MasterProcedureProtos.SplitTableRegionStateData.Builder 
splitTableRegionMsg =
+        MasterProcedureProtos.SplitTableRegionStateData.newBuilder()
+        .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
+        .setParentRegionInfo(HRegionInfo.convert(getRegion()))
+        .addChildRegionInfo(HRegionInfo.convert(daughter_1_HRI))
+        .addChildRegionInfo(HRegionInfo.convert(daughter_2_HRI));
+    splitTableRegionMsg.build().writeDelimitedTo(stream);
+  }
+
+  @Override
+  public void deserializeStateData(final InputStream stream) throws 
IOException {
+    super.deserializeStateData(stream);
+
+    final MasterProcedureProtos.SplitTableRegionStateData splitTableRegionsMsg 
=
+        
MasterProcedureProtos.SplitTableRegionStateData.parseDelimitedFrom(stream);
+    
setUser(MasterProcedureUtil.toUserInfo(splitTableRegionsMsg.getUserInfo()));
+    setRegion(HRegionInfo.convert(splitTableRegionsMsg.getParentRegionInfo()));
+    assert(splitTableRegionsMsg.getChildRegionInfoCount() == 2);
+    daughter_1_HRI = 
HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfo(0));
+    daughter_2_HRI = 
HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfo(1));
+  }
+
+  @Override
+  public void toStringClassDetails(StringBuilder sb) {
+    sb.append(getClass().getSimpleName());
+    sb.append(" table=");
+    sb.append(getTableName());
+    sb.append(", parent=");
+    sb.append(getParentRegion().getShortNameToLog());
+    sb.append(", daughterA=");
+    sb.append(daughter_1_HRI.getShortNameToLog());
+    sb.append(", daughterB=");
+    sb.append(daughter_2_HRI.getShortNameToLog());
+  }
+
+  private HRegionInfo getParentRegion() {
+    return getRegion();
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.REGION_SPLIT;
+  }
+
+  private byte[] getSplitRow() {
+    return daughter_2_HRI.getStartKey();
+  }
+
+  private static State [] EXPECTED_SPLIT_STATES = new State [] {State.OPEN, 
State.CLOSED};
+  /**
+   * Prepare to Split region.
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public boolean prepareSplitRegion(final MasterProcedureEnv env) throws 
IOException {
+    // Check whether the region is splittable
+    RegionStateNode node = 
env.getAssignmentManager().getRegionStates().getRegionNode(getParentRegion());
+    HRegionInfo parentHRI = null;
+    if (node != null) {
+      parentHRI = node.getRegionInfo();
+
+      // Lookup the parent HRI state from the AM, which has the latest updated 
info.
+      // Protect against the case where concurrent SPLIT requests came in and 
succeeded
+      // just before us.
+      if (node.isInState(State.SPLIT)) {
+        LOG.info("Split of " + parentHRI + " skipped; state is already SPLIT");
+        return false;
+      }
+      if (parentHRI.isSplit() || parentHRI.isOffline()) {
+        LOG.info("Split of " + parentHRI + " skipped because offline/split.");
+        return false;
+      }
+
+      // expected parent to be online or closed
+      if (!node.isInState(EXPECTED_SPLIT_STATES)) {
+        // We may have SPLIT already?
+        setFailure(new IOException("Split " + 
parentHRI.getRegionNameAsString() +
+            " FAILED because state=" + node.getState() + "; expected " +
+            Arrays.toString(EXPECTED_SPLIT_STATES)));
+        return false;
+      }
+
+      // Ask the remote regionserver if this region is splittable. If we get 
an IOE, report it
+      // along w/ the failure so can see why we are not splittable at this 
time.
+      IOException splittableCheckIOE = null;
+      boolean splittable = false;
+      try {
+        GetRegionInfoResponse response =
+            Util.getRegionInfoResponse(env, node.getRegionLocation(), 
node.getRegionInfo());
+        splittable = response.hasSplittable() && response.getSplittable();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Splittable=" + splittable + " " + this + " " + 
node.toShortString());
+        }
+      } catch (IOException e) {
+        splittableCheckIOE = e;
+      }
+      if (!splittable) {
+        IOException e = new IOException(parentHRI.getShortNameToLog() + " NOT 
splittable");
+        if (splittableCheckIOE != null) e.initCause(splittableCheckIOE);
+        setFailure(e);
+        return false;
+      }
+    }
+
+    // Since we have the lock and the master is coordinating the operation
+    // we are always able to split the region
+    if 
(!env.getMasterServices().isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
+      LOG.warn("pid=" + getProcId() + " split switch is off! skip split of " + 
parentHRI);
+      setFailure(new IOException("Split region " +
+          (parentHRI == null? "null": parentHRI.getRegionNameAsString()) +
+          " failed due to split switch off"));
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Action before splitting region in a table.
+   * @param env MasterProcedureEnv
+   * @param state the procedure state
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void preSplitRegion(final MasterProcedureEnv env)
+      throws IOException, InterruptedException {
+    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.preSplitRegionAction(getTableName(), getSplitRow(), getUser());
+    }
+  }
+
+  /**
+   * Action after rollback a split table region action.
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  private void postRollBackSplitRegion(final MasterProcedureEnv env) throws 
IOException {
+    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.postRollBackSplitRegionAction(getUser());
+    }
+  }
+
+  /**
+   * Rollback close parent region
+   * @param env MasterProcedureEnv
+   **/
+  private void openParentRegion(final MasterProcedureEnv env) throws 
IOException {
+    // Check whether the region is closed; if so, open it in the same server
+    final int regionReplication = getRegionReplication(env);
+    final ServerName serverName = getParentRegionServerName(env);
+
+    final AssignProcedure[] procs = new AssignProcedure[regionReplication];
+    for (int i = 0; i < regionReplication; ++i) {
+      final HRegionInfo hri = 
RegionReplicaUtil.getRegionInfoForReplica(getParentRegion(), i);
+      procs[i] = env.getAssignmentManager().createAssignProcedure(hri, 
serverName);
+    }
+    
env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs);
+  }
+
+  /**
+   * Create daughter regions
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public void createDaughterRegions(final MasterProcedureEnv env) throws 
IOException {
+    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), 
getTableName());
+    final FileSystem fs = mfs.getFileSystem();
+    HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+      env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
+    regionFs.createSplitsDir();
+
+    Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
+
+    assertReferenceFileCount(fs, expectedReferences.getFirst(),
+      regionFs.getSplitsDir(daughter_1_HRI));
+    //Move the files from the temporary .splits to the final /table/region 
directory
+    regionFs.commitDaughterRegion(daughter_1_HRI);
+    assertReferenceFileCount(fs, expectedReferences.getFirst(),
+      new Path(tabledir, daughter_1_HRI.getEncodedName()));
+
+    assertReferenceFileCount(fs, expectedReferences.getSecond(),
+      regionFs.getSplitsDir(daughter_2_HRI));
+    regionFs.commitDaughterRegion(daughter_2_HRI);
+    assertReferenceFileCount(fs, expectedReferences.getSecond(),
+      new Path(tabledir, daughter_2_HRI.getEncodedName()));
+  }
+
+  /**
+   * Create Split directory
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  private Pair<Integer, Integer> splitStoreFiles(
+      final MasterProcedureEnv env,
+      final HRegionFileSystem regionFs) throws IOException {
+    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    final Configuration conf = env.getMasterConfiguration();
+
+    // The following code sets up a thread pool executor with as many slots as
+    // there's files to split. It then fires up everything, waits for
+    // completion and finally checks for any exception
+    //
+    // Note: splitStoreFiles creates daughter region dirs under the parent 
splits dir
+    // Nothing to unroll here if failure -- re-run createSplitsDir will
+    // clean this up.
+    int nbFiles = 0;
+    for (String family: regionFs.getFamilies()) {
+      final Collection<StoreFileInfo> storeFiles = 
regionFs.getStoreFiles(family);
+      if (storeFiles != null) {
+        nbFiles += storeFiles.size();
+      }
+    }
+    if (nbFiles == 0) {
+      // no file needs to be splitted.
+      return new Pair<Integer, Integer>(0,0);
+    }
+    // Max #threads is the smaller of the number of storefiles or the default 
max determined above.
+    int maxThreads = Math.min(
+      conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
+        conf.getInt(HStore.BLOCKING_STOREFILES_KEY, 
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT)),
+      nbFiles);
+    LOG.info("pid=" + getProcId() + " preparing to split " + nbFiles + " 
storefiles for region " +
+      getParentRegion().getShortNameToLog() + " using " + maxThreads + " 
threads");
+    final ExecutorService threadPool = Executors.newFixedThreadPool(
+      maxThreads, Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
+    final List<Future<Pair<Path,Path>>> futures = new 
ArrayList<Future<Pair<Path,Path>>>(nbFiles);
+
+    // Split each store file.
+    final TableDescriptor htd = 
env.getMasterServices().getTableDescriptors().get(getTableName());
+    for (String family: regionFs.getFamilies()) {
+      final HColumnDescriptor hcd = htd.getFamily(family.getBytes());
+      final Collection<StoreFileInfo> storeFiles = 
regionFs.getStoreFiles(family);
+      if (storeFiles != null && storeFiles.size() > 0) {
+        final CacheConfig cacheConf = new CacheConfig(conf, hcd);
+        for (StoreFileInfo storeFileInfo: storeFiles) {
+          StoreFileSplitter sfs = new StoreFileSplitter(
+            regionFs,
+            family.getBytes(),
+            new StoreFile(
+              mfs.getFileSystem(), storeFileInfo, conf, cacheConf, 
hcd.getBloomFilterType()));
+          futures.add(threadPool.submit(sfs));
+        }
+      }
+    }
+    // Shutdown the pool
+    threadPool.shutdown();
+
+    // Wait for all the tasks to finish
+    long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout", 
30000);
+    try {
+      boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, 
TimeUnit.MILLISECONDS);
+      if (stillRunning) {
+        threadPool.shutdownNow();
+        // wait for the thread to shutdown completely.
+        while (!threadPool.isTerminated()) {
+          Thread.sleep(50);
+        }
+        throw new IOException("Took too long to split the" +
+            " files and create the references, aborting split");
+      }
+    } catch (InterruptedException e) {
+      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+    }
+
+    int daughterA = 0;
+    int daughterB = 0;
+    // Look for any exception
+    for (Future<Pair<Path, Path>> future : futures) {
+      try {
+        Pair<Path, Path> p = future.get();
+        daughterA += p.getFirst() != null ? 1 : 0;
+        daughterB += p.getSecond() != null ? 1 : 0;
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException) new 
InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("pid=" + getProcId() + " split storefiles for region " + 
getParentRegion().getShortNameToLog() +
+          " Daughter A: " + daughterA + " storefiles, Daughter B: " + 
daughterB + " storefiles.");
+    }
+    return new Pair<Integer, Integer>(daughterA, daughterB);
+  }
+
+  private void assertReferenceFileCount(final FileSystem fs, final int 
expectedReferenceFileCount,
+      final Path dir) throws IOException {
+    if (expectedReferenceFileCount != 0 &&
+        expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(fs, 
dir)) {
+      throw new IOException("Failing split. Expected reference file count 
isn't equal.");
+    }
+  }
+
+  private Pair<Path, Path> splitStoreFile(final HRegionFileSystem regionFs,
+      final byte[] family, final StoreFile sf) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("pid=" + getProcId() + " splitting started for store file: " +
+          sf.getPath() + " for region: " + getParentRegion());
+    }
+
+    final byte[] splitRow = getSplitRow();
+    final String familyName = Bytes.toString(family);
+    final Path path_first =
+        regionFs.splitStoreFile(this.daughter_1_HRI, familyName, sf, splitRow, 
false, null);
+    final Path path_second =
+        regionFs.splitStoreFile(this.daughter_2_HRI, familyName, sf, splitRow, 
true, null);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("pid=" + getProcId() + " splitting complete for store file: " +
+          sf.getPath() + " for region: " + 
getParentRegion().getShortNameToLog());
+    }
+    return new Pair<Path,Path>(path_first, path_second);
+  }
+
+  /**
+   * Utility class used to do the file splitting / reference writing
+   * in parallel instead of sequentially.
+   */
+  private class StoreFileSplitter implements Callable<Pair<Path,Path>> {
+    private final HRegionFileSystem regionFs;
+    private final byte[] family;
+    private final StoreFile sf;
+
+    /**
+     * Constructor that takes what it needs to split
+     * @param regionFs the file system
+     * @param family Family that contains the store file
+     * @param sf which file
+     */
+    public StoreFileSplitter(final HRegionFileSystem regionFs, final byte[] 
family,
+        final StoreFile sf) {
+      this.regionFs = regionFs;
+      this.sf = sf;
+      this.family = family;
+    }
+
+    public Pair<Path,Path> call() throws IOException {
+      return splitStoreFile(regionFs, family, sf);
+    }
+  }
+
+  /**
+   * Post split region actions before the Point-of-No-Return step
+   * @param env MasterProcedureEnv
+   **/
+  private void preSplitRegionBeforePONR(final MasterProcedureEnv env)
+      throws IOException, InterruptedException {
+    final List<Mutation> metaEntries = new ArrayList<Mutation>();
+    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      if (cpHost.preSplitBeforePONRAction(getSplitRow(), metaEntries, 
getUser())) {
+        throw new IOException("Coprocessor bypassing region " +
+            getParentRegion().getRegionNameAsString() + " split.");
+      }
+      try {
+        for (Mutation p : metaEntries) {
+          HRegionInfo.parseRegionName(p.getRow());
+        }
+      } catch (IOException e) {
+        LOG.error("pid=" + getProcId() + " row key of mutation from 
coprocessor not parsable as "
+            + "region name."
+            + "Mutations from coprocessor should only for hbase:meta table.");
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Add daughter regions to META
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  private void updateMetaForDaughterRegions(final MasterProcedureEnv env) 
throws IOException {
+    env.getAssignmentManager().markRegionAsSplit(getParentRegion(), 
getParentRegionServerName(env),
+      daughter_1_HRI, daughter_2_HRI);
+  }
+
+  /**
+   * Pre split region actions after the Point-of-No-Return step
+   * @param env MasterProcedureEnv
+   **/
+  private void preSplitRegionAfterPONR(final MasterProcedureEnv env)
+      throws IOException, InterruptedException {
+    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.preSplitAfterPONRAction(getUser());
+    }
+  }
+
+  /**
+   * Post split region actions
+   * @param env MasterProcedureEnv
+   **/
+  private void postSplitRegion(final MasterProcedureEnv env) throws 
IOException {
+    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.postCompletedSplitRegionAction(daughter_1_HRI, daughter_2_HRI, 
getUser());
+    }
+  }
+
+  private ServerName getParentRegionServerName(final MasterProcedureEnv env) {
+    return env.getMasterServices().getAssignmentManager()
+      .getRegionStates().getRegionServerOfRegion(getParentRegion());
+  }
+
+  private UnassignProcedure[] createUnassignProcedures(final 
MasterProcedureEnv env,
+      final int regionReplication) {
+    final UnassignProcedure[] procs = new UnassignProcedure[regionReplication];
+    for (int i = 0; i < procs.length; ++i) {
+      final HRegionInfo hri = 
RegionReplicaUtil.getRegionInfoForReplica(getParentRegion(), i);
+      procs[i] = env.getAssignmentManager().createUnassignProcedure(hri, null, 
true);
+    }
+    return procs;
+  }
+
+  private AssignProcedure[] createAssignProcedures(final MasterProcedureEnv 
env,
+      final int regionReplication) {
+    final ServerName targetServer = getParentRegionServerName(env);
+    final AssignProcedure[] procs = new AssignProcedure[regionReplication * 2];
+    int procsIdx = 0;
+    for (int i = 0; i < regionReplication; ++i) {
+      final HRegionInfo hri = 
RegionReplicaUtil.getRegionInfoForReplica(daughter_1_HRI, i);
+      procs[procsIdx++] = 
env.getAssignmentManager().createAssignProcedure(hri, targetServer);
+    }
+    for (int i = 0; i < regionReplication; ++i) {
+      final HRegionInfo hri = 
RegionReplicaUtil.getRegionInfoForReplica(daughter_2_HRI, i);
+      procs[procsIdx++] = 
env.getAssignmentManager().createAssignProcedure(hri, targetServer);
+    }
+    return procs;
+  }
+
+  private int getRegionReplication(final MasterProcedureEnv env) throws 
IOException {
+    final TableDescriptor htd = 
env.getMasterServices().getTableDescriptors().get(getTableName());
+    return htd.getRegionReplication();
+  }
+
+  /**
+   * The procedure could be restarted from a different machine. If the 
variable is null, we need to
+   * retrieve it.
+   * @return traceEnabled
+   */
+  private boolean isTraceEnabled() {
+    if (traceEnabled == null) {
+      traceEnabled = LOG.isTraceEnabled();
+    }
+    return traceEnabled;
+  }
+}

Reply via email to