HBASE-16941: FavoredNodes - Split/Merge code paths Signed-off-by: Francis Liu <tof...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c1293cc9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c1293cc9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c1293cc9 Branch: refs/heads/master Commit: c1293cc91e29d09c04297f126a76e84edb1af8fd Parents: 75567f8 Author: Thiruvel Thirumoolan <thiru...@gmail.com> Authored: Mon Dec 5 16:03:00 2016 -0800 Committer: Francis Liu <tof...@apache.org> Committed: Wed Dec 7 16:38:48 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ServerName.java | 2 +- .../favored/FavoredNodeAssignmentHelper.java | 798 +++++++++++++++++++ .../hbase/favored/FavoredNodeLoadBalancer.java | 443 ++++++++++ .../hbase/favored/FavoredNodesManager.java | 185 +++++ .../hadoop/hbase/favored/FavoredNodesPlan.java | 144 ++++ .../hbase/favored/FavoredNodesPromoter.java | 35 + .../favored/StartcodeAgnosticServerName.java | 66 ++ .../hadoop/hbase/master/AssignmentManager.java | 77 +- .../master/AssignmentVerificationReport.java | 4 +- .../org/apache/hadoop/hbase/master/HMaster.java | 22 +- .../hadoop/hbase/master/MasterServices.java | 6 + .../hbase/master/RegionPlacementMaintainer.java | 4 +- .../hadoop/hbase/master/ServerManager.java | 24 + .../SnapshotOfRegionAssignmentFromMeta.java | 81 +- .../balancer/FavoredNodeAssignmentHelper.java | 606 -------------- .../balancer/FavoredNodeLoadBalancer.java | 356 --------- .../hbase/master/balancer/FavoredNodesPlan.java | 135 ---- .../hbase/client/TestTableFavoredNodes.java | 297 +++++++ .../TestFavoredNodeAssignmentHelper.java | 650 +++++++++++++++ .../TestStartcodeAgnosticServerName.java | 50 ++ .../hbase/master/MockNoopMasterServices.java | 6 + .../hbase/master/TestRegionPlacement.java | 8 +- .../hbase/master/TestRegionPlacement2.java | 19 +- .../TestFavoredNodeAssignmentHelper.java | 364 --------- 24 files changed, 2861 insertions(+), 1521 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c1293cc9/hbase-common/src/main/java/org/apache/hadoop/hbase/ServerName.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ServerName.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ServerName.java index 8d18db0..4ae500a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ServerName.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ServerName.java @@ -98,7 +98,7 @@ import com.google.common.net.InetAddresses; private byte [] bytes; public static final List<ServerName> EMPTY_SERVER_LIST = new ArrayList<ServerName>(0); - private ServerName(final String hostname, final int port, final long startcode) { + protected ServerName(final String hostname, final int port, final long startcode) { // Drop the domain is there is one; no need of it in a local cluster. With it, we get long // unwieldy names. this.hostnameOnly = hostname; http://git-wip-us.apache.org/repos/asf/hbase/blob/c1293cc9/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java new file mode 100644 index 0000000..98e058d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeAssignmentHelper.java @@ -0,0 +1,798 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.favored; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FavoredNodes; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * Helper class for {@link FavoredNodeLoadBalancer} that has all the intelligence for racks, + * meta scans, etc. Instantiated by the {@link FavoredNodeLoadBalancer} when needed (from + * within calls like {@link FavoredNodeLoadBalancer#randomAssignment(HRegionInfo, List)}). + * All updates to favored nodes should only be done from {@link FavoredNodesManager} and not + * through this helper class (except for tests). + */ +@InterfaceAudience.Private +public class FavoredNodeAssignmentHelper { + private static final Log LOG = LogFactory.getLog(FavoredNodeAssignmentHelper.class); + private RackManager rackManager; + private Map<String, List<ServerName>> rackToRegionServerMap; + private List<String> uniqueRackList; + // This map serves as a cache for rack to sn lookups. The num of + // region server entries might not match with that is in servers. + private Map<String, String> regionServerToRackMap; + private Random random; + private List<ServerName> servers; + public static final byte [] FAVOREDNODES_QUALIFIER = Bytes.toBytes("fn"); + public final static short FAVORED_NODES_NUM = 3; + public final static short MAX_ATTEMPTS_FN_GENERATION = 10; + + public FavoredNodeAssignmentHelper(final List<ServerName> servers, Configuration conf) { + this(servers, new RackManager(conf)); + } + + public FavoredNodeAssignmentHelper(final List<ServerName> servers, + final RackManager rackManager) { + this.servers = servers; + this.rackManager = rackManager; + this.rackToRegionServerMap = new HashMap<String, List<ServerName>>(); + this.regionServerToRackMap = new HashMap<String, String>(); + this.uniqueRackList = new ArrayList<String>(); + this.random = new Random(); + } + + // Always initialize() when FavoredNodeAssignmentHelper is constructed. + public void initialize() { + for (ServerName sn : this.servers) { + String rackName = getRackOfServer(sn); + List<ServerName> serverList = this.rackToRegionServerMap.get(rackName); + if (serverList == null) { + serverList = Lists.newArrayList(); + // Add the current rack to the unique rack list + this.uniqueRackList.add(rackName); + this.rackToRegionServerMap.put(rackName, serverList); + } + for (ServerName serverName : serverList) { + if (ServerName.isSameHostnameAndPort(sn, serverName)) { + // The server is already present, ignore. + break; + } + } + serverList.add((sn)); + this.regionServerToRackMap.put(sn.getHostname(), rackName); + } + } + + /** + * Update meta table with favored nodes info + * @param regionToFavoredNodes map of HRegionInfo's to their favored nodes + * @param connection connection to be used + * @throws IOException + */ + public static void updateMetaWithFavoredNodesInfo( + Map<HRegionInfo, List<ServerName>> regionToFavoredNodes, + Connection connection) throws IOException { + List<Put> puts = new ArrayList<Put>(); + for (Map.Entry<HRegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) { + Put put = makePutFromRegionInfo(entry.getKey(), entry.getValue()); + if (put != null) { + puts.add(put); + } + } + MetaTableAccessor.putsToMetaTable(connection, puts); + LOG.info("Added " + puts.size() + " regions in META"); + } + + /** + * Update meta table with favored nodes info + * @param regionToFavoredNodes + * @param conf + * @throws IOException + */ + public static void updateMetaWithFavoredNodesInfo( + Map<HRegionInfo, List<ServerName>> regionToFavoredNodes, + Configuration conf) throws IOException { + List<Put> puts = new ArrayList<Put>(); + for (Map.Entry<HRegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) { + Put put = makePutFromRegionInfo(entry.getKey(), entry.getValue()); + if (put != null) { + puts.add(put); + } + } + // Write the region assignments to the meta table. + // TODO: See above overrides take a Connection rather than a Configuration only the + // Connection is a short circuit connection. That is not going to good in all cases, when + // master and meta are not colocated. Fix when this favored nodes feature is actually used + // someday. + try (Connection connection = ConnectionFactory.createConnection(conf)) { + try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) { + metaTable.put(puts); + } + } + LOG.info("Added " + puts.size() + " regions in META"); + } + + /** + * Generates and returns a Put containing the region info for the catalog table + * and the servers + * @param regionInfo + * @param favoredNodeList + * @return Put object + */ + static Put makePutFromRegionInfo(HRegionInfo regionInfo, List<ServerName>favoredNodeList) + throws IOException { + Put put = null; + if (favoredNodeList != null) { + put = MetaTableAccessor.makePutFromRegionInfo(regionInfo); + byte[] favoredNodes = getFavoredNodes(favoredNodeList); + put.addImmutable(HConstants.CATALOG_FAMILY, FAVOREDNODES_QUALIFIER, + EnvironmentEdgeManager.currentTime(), favoredNodes); + LOG.debug("Create the region " + regionInfo.getRegionNameAsString() + + " with favored nodes " + favoredNodeList); + } + return put; + } + + /** + * @param favoredNodes The PB'ed bytes of favored nodes + * @return the array of {@link ServerName} for the byte array of favored nodes. + * @throws IOException + */ + public static ServerName[] getFavoredNodesList(byte[] favoredNodes) throws IOException { + FavoredNodes f = FavoredNodes.parseFrom(favoredNodes); + List<HBaseProtos.ServerName> protoNodes = f.getFavoredNodeList(); + ServerName[] servers = new ServerName[protoNodes.size()]; + int i = 0; + for (HBaseProtos.ServerName node : protoNodes) { + servers[i++] = ProtobufUtil.toServerName(node); + } + return servers; + } + + /** + * @param serverAddrList + * @return PB'ed bytes of {@link FavoredNodes} generated by the server list. + */ + public static byte[] getFavoredNodes(List<ServerName> serverAddrList) { + FavoredNodes.Builder f = FavoredNodes.newBuilder(); + for (ServerName s : serverAddrList) { + HBaseProtos.ServerName.Builder b = HBaseProtos.ServerName.newBuilder(); + b.setHostName(s.getHostname()); + b.setPort(s.getPort()); + b.setStartCode(ServerName.NON_STARTCODE); + f.addFavoredNode(b.build()); + } + return f.build().toByteArray(); + } + + // Place the regions round-robin across the racks picking one server from each + // rack at a time. Start with a random rack, and a random server from every rack. + // If a rack doesn't have enough servers it will go to the next rack and so on. + // for choosing a primary. + // For example, if 4 racks (r1 .. r4) with 8 servers (s1..s8) each, one possible + // placement could be r2:s5, r3:s5, r4:s5, r1:s5, r2:s6, r3:s6.. + // If there were fewer servers in one rack, say r3, which had 3 servers, one possible + // placement could be r2:s5, <skip-r3>, r4:s5, r1:s5, r2:s6, <skip-r3> ... + // The regions should be distributed proportionately to the racksizes + void placePrimaryRSAsRoundRobin(Map<ServerName, List<HRegionInfo>> assignmentMap, + Map<HRegionInfo, ServerName> primaryRSMap, List<HRegionInfo> regions) { + List<String> rackList = new ArrayList<String>(rackToRegionServerMap.size()); + rackList.addAll(rackToRegionServerMap.keySet()); + int rackIndex = random.nextInt(rackList.size()); + int maxRackSize = 0; + for (Map.Entry<String,List<ServerName>> r : rackToRegionServerMap.entrySet()) { + if (r.getValue().size() > maxRackSize) { + maxRackSize = r.getValue().size(); + } + } + int numIterations = 0; + int firstServerIndex = random.nextInt(maxRackSize); + // Initialize the current processing host index. + int serverIndex = firstServerIndex; + for (HRegionInfo regionInfo : regions) { + List<ServerName> currentServerList; + String rackName; + while (true) { + rackName = rackList.get(rackIndex); + numIterations++; + // Get the server list for the current rack + currentServerList = rackToRegionServerMap.get(rackName); + + if (serverIndex >= currentServerList.size()) { //not enough machines in this rack + if (numIterations % rackList.size() == 0) { + if (++serverIndex >= maxRackSize) serverIndex = 0; + } + if ((++rackIndex) >= rackList.size()) { + rackIndex = 0; // reset the rack index to 0 + } + } else break; + } + + // Get the current process region server + ServerName currentServer = currentServerList.get(serverIndex); + + // Place the current region with the current primary region server + primaryRSMap.put(regionInfo, currentServer); + if (assignmentMap != null) { + List<HRegionInfo> regionsForServer = assignmentMap.get(currentServer); + if (regionsForServer == null) { + regionsForServer = new ArrayList<HRegionInfo>(); + assignmentMap.put(currentServer, regionsForServer); + } + regionsForServer.add(regionInfo); + } + + // Set the next processing index + if (numIterations % rackList.size() == 0) { + ++serverIndex; + } + if ((++rackIndex) >= rackList.size()) { + rackIndex = 0; // reset the rack index to 0 + } + } + } + + Map<HRegionInfo, ServerName[]> placeSecondaryAndTertiaryRS( + Map<HRegionInfo, ServerName> primaryRSMap) { + Map<HRegionInfo, ServerName[]> secondaryAndTertiaryMap = + new HashMap<HRegionInfo, ServerName[]>(); + for (Map.Entry<HRegionInfo, ServerName> entry : primaryRSMap.entrySet()) { + // Get the target region and its primary region server rack + HRegionInfo regionInfo = entry.getKey(); + ServerName primaryRS = entry.getValue(); + try { + // Create the secondary and tertiary region server pair object. + ServerName[] favoredNodes; + // Get the rack for the primary region server + String primaryRack = getRackOfServer(primaryRS); + + if (getTotalNumberOfRacks() == 1) { + favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack); + } else { + favoredNodes = multiRackCase(regionInfo, primaryRS, primaryRack); + } + if (favoredNodes != null) { + secondaryAndTertiaryMap.put(regionInfo, favoredNodes); + LOG.debug("Place the secondary and tertiary region server for region " + + regionInfo.getRegionNameAsString()); + } + } catch (Exception e) { + LOG.warn("Cannot place the favored nodes for region " + + regionInfo.getRegionNameAsString() + " because " + e, e); + continue; + } + } + return secondaryAndTertiaryMap; + } + + private Map<ServerName, Set<HRegionInfo>> mapRSToPrimaries( + Map<HRegionInfo, ServerName> primaryRSMap) { + Map<ServerName, Set<HRegionInfo>> primaryServerMap = + new HashMap<ServerName, Set<HRegionInfo>>(); + for (Entry<HRegionInfo, ServerName> e : primaryRSMap.entrySet()) { + Set<HRegionInfo> currentSet = primaryServerMap.get(e.getValue()); + if (currentSet == null) { + currentSet = new HashSet<HRegionInfo>(); + } + currentSet.add(e.getKey()); + primaryServerMap.put(e.getValue(), currentSet); + } + return primaryServerMap; + } + + /** + * For regions that share the primary, avoid placing the secondary and tertiary + * on a same RS. Used for generating new assignments for the + * primary/secondary/tertiary RegionServers + * @param primaryRSMap + * @return the map of regions to the servers the region-files should be hosted on + */ + public Map<HRegionInfo, ServerName[]> placeSecondaryAndTertiaryWithRestrictions( + Map<HRegionInfo, ServerName> primaryRSMap) { + Map<ServerName, Set<HRegionInfo>> serverToPrimaries = + mapRSToPrimaries(primaryRSMap); + Map<HRegionInfo, ServerName[]> secondaryAndTertiaryMap = + new HashMap<HRegionInfo, ServerName[]>(); + + for (Entry<HRegionInfo, ServerName> entry : primaryRSMap.entrySet()) { + // Get the target region and its primary region server rack + HRegionInfo regionInfo = entry.getKey(); + ServerName primaryRS = entry.getValue(); + try { + // Get the rack for the primary region server + String primaryRack = getRackOfServer(primaryRS); + ServerName[] favoredNodes = null; + if (getTotalNumberOfRacks() == 1) { + // Single rack case: have to pick the secondary and tertiary + // from the same rack + favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack); + } else { + favoredNodes = multiRackCaseWithRestrictions(serverToPrimaries, + secondaryAndTertiaryMap, primaryRack, primaryRS, regionInfo); + } + if (favoredNodes != null) { + secondaryAndTertiaryMap.put(regionInfo, favoredNodes); + LOG.debug("Place the secondary and tertiary region server for region " + + regionInfo.getRegionNameAsString()); + } + } catch (Exception e) { + LOG.warn("Cannot place the favored nodes for region " + + regionInfo.getRegionNameAsString() + " because " + e, e); + continue; + } + } + return secondaryAndTertiaryMap; + } + + private ServerName[] multiRackCaseWithRestrictions( + Map<ServerName, Set<HRegionInfo>> serverToPrimaries, + Map<HRegionInfo, ServerName[]> secondaryAndTertiaryMap, + String primaryRack, ServerName primaryRS, HRegionInfo regionInfo) throws IOException { + // Random to choose the secondary and tertiary region server + // from another rack to place the secondary and tertiary + // Random to choose one rack except for the current rack + Set<String> rackSkipSet = new HashSet<String>(); + rackSkipSet.add(primaryRack); + String secondaryRack = getOneRandomRack(rackSkipSet); + List<ServerName> serverList = getServersFromRack(secondaryRack); + Set<ServerName> serverSet = new HashSet<ServerName>(); + serverSet.addAll(serverList); + ServerName[] favoredNodes; + if (serverList.size() >= 2) { + // Randomly pick up two servers from this secondary rack + // Skip the secondary for the tertiary placement + // skip the servers which share the primary already + Set<HRegionInfo> primaries = serverToPrimaries.get(primaryRS); + Set<ServerName> skipServerSet = new HashSet<ServerName>(); + while (true) { + ServerName[] secondaryAndTertiary = null; + if (primaries.size() > 1) { + // check where his tertiary and secondary are + for (HRegionInfo primary : primaries) { + secondaryAndTertiary = secondaryAndTertiaryMap.get(primary); + if (secondaryAndTertiary != null) { + if (getRackOfServer(secondaryAndTertiary[0]).equals(secondaryRack)) { + skipServerSet.add(secondaryAndTertiary[0]); + } + if (getRackOfServer(secondaryAndTertiary[1]).equals(secondaryRack)) { + skipServerSet.add(secondaryAndTertiary[1]); + } + } + } + } + if (skipServerSet.size() + 2 <= serverSet.size()) + break; + skipServerSet.clear(); + rackSkipSet.add(secondaryRack); + // we used all racks + if (rackSkipSet.size() == getTotalNumberOfRacks()) { + // remove the last two added and break + skipServerSet.remove(secondaryAndTertiary[0]); + skipServerSet.remove(secondaryAndTertiary[1]); + break; + } + secondaryRack = getOneRandomRack(rackSkipSet); + serverList = getServersFromRack(secondaryRack); + serverSet = new HashSet<ServerName>(); + serverSet.addAll(serverList); + } + + // Place the secondary RS + ServerName secondaryRS = getOneRandomServer(secondaryRack, skipServerSet); + skipServerSet.add(secondaryRS); + // Place the tertiary RS + ServerName tertiaryRS = getOneRandomServer(secondaryRack, skipServerSet); + + if (secondaryRS == null || tertiaryRS == null) { + LOG.error("Cannot place the secondary and tertiary" + + " region server for region " + + regionInfo.getRegionNameAsString()); + } + // Create the secondary and tertiary pair + favoredNodes = new ServerName[2]; + favoredNodes[0] = secondaryRS; + favoredNodes[1] = tertiaryRS; + } else { + // Pick the secondary rs from this secondary rack + // and pick the tertiary from another random rack + favoredNodes = new ServerName[2]; + ServerName secondary = getOneRandomServer(secondaryRack); + favoredNodes[0] = secondary; + + // Pick the tertiary + if (getTotalNumberOfRacks() == 2) { + // Pick the tertiary from the same rack of the primary RS + Set<ServerName> serverSkipSet = new HashSet<ServerName>(); + serverSkipSet.add(primaryRS); + favoredNodes[1] = getOneRandomServer(primaryRack, serverSkipSet); + } else { + // Pick the tertiary from another rack + rackSkipSet.add(secondaryRack); + String tertiaryRandomRack = getOneRandomRack(rackSkipSet); + favoredNodes[1] = getOneRandomServer(tertiaryRandomRack); + } + } + return favoredNodes; + } + + private ServerName[] singleRackCase(HRegionInfo regionInfo, + ServerName primaryRS, + String primaryRack) throws IOException { + // Single rack case: have to pick the secondary and tertiary + // from the same rack + List<ServerName> serverList = getServersFromRack(primaryRack); + if ((serverList == null) || (serverList.size() <= 2)) { + // Single region server case: cannot not place the favored nodes + // on any server; + return null; + } else { + // Randomly select two region servers from the server list and make sure + // they are not overlap with the primary region server; + Set<ServerName> serverSkipSet = new HashSet<ServerName>(); + serverSkipSet.add(primaryRS); + + // Place the secondary RS + ServerName secondaryRS = getOneRandomServer(primaryRack, serverSkipSet); + // Skip the secondary for the tertiary placement + serverSkipSet.add(secondaryRS); + ServerName tertiaryRS = getOneRandomServer(primaryRack, serverSkipSet); + + if (secondaryRS == null || tertiaryRS == null) { + LOG.error("Cannot place the secondary, tertiary favored node for region " + + regionInfo.getRegionNameAsString()); + } + // Create the secondary and tertiary pair + ServerName[] favoredNodes = new ServerName[2]; + favoredNodes[0] = secondaryRS; + favoredNodes[1] = tertiaryRS; + return favoredNodes; + } + } + + /** + * Place secondary and tertiary nodes in a multi rack case. + * If there are only two racks, then we try the place the secondary + * and tertiary on different rack than primary. But if the other rack has + * only one region server, then we place primary and tertiary on one rack + * and secondary on another. The aim is two distribute the three favored nodes + * on >= 2 racks. + * TODO: see how we can use generateMissingFavoredNodeMultiRack API here + * @param regionInfo Region for which we are trying to generate FN + * @param primaryRS The primary favored node. + * @param primaryRack The rack of the primary favored node. + * @return Array containing secondary and tertiary favored nodes. + * @throws IOException Signals that an I/O exception has occurred. + */ + private ServerName[] multiRackCase(HRegionInfo regionInfo, ServerName primaryRS, + String primaryRack) throws IOException { + + List<ServerName>favoredNodes = Lists.newArrayList(primaryRS); + // Create the secondary and tertiary pair + ServerName secondaryRS = generateMissingFavoredNodeMultiRack(favoredNodes); + favoredNodes.add(secondaryRS); + String secondaryRack = getRackOfServer(secondaryRS); + + ServerName tertiaryRS; + if (primaryRack.equals(secondaryRack)) { + tertiaryRS = generateMissingFavoredNode(favoredNodes); + } else { + // Try to place tertiary in secondary RS rack else place on primary rack. + tertiaryRS = getOneRandomServer(secondaryRack, Sets.newHashSet(secondaryRS)); + if (tertiaryRS == null) { + tertiaryRS = getOneRandomServer(primaryRack, Sets.newHashSet(primaryRS)); + } + // We couldn't find anything in secondary rack, get any FN + if (tertiaryRS == null) { + tertiaryRS = generateMissingFavoredNode(Lists.newArrayList(primaryRS, secondaryRS)); + } + } + return new ServerName[]{ secondaryRS, tertiaryRS }; + } + + boolean canPlaceFavoredNodes() { + return (this.servers.size() >= FAVORED_NODES_NUM); + } + + private int getTotalNumberOfRacks() { + return this.uniqueRackList.size(); + } + + private List<ServerName> getServersFromRack(String rack) { + return this.rackToRegionServerMap.get(rack); + } + + /** + * Gets a random server from the specified rack and skips anything specified. + + * @param rack rack from a server is needed + * @param skipServerSet the server shouldn't belong to this set + */ + protected ServerName getOneRandomServer(String rack, Set<ServerName> skipServerSet) + throws IOException { + + // Is the rack valid? Do we recognize it? + if (rack == null || getServersFromRack(rack) == null || + getServersFromRack(rack).size() == 0) { + return null; + } + + // Lets use a set so we can eliminate duplicates + Set<StartcodeAgnosticServerName> serversToChooseFrom = Sets.newHashSet(); + for (ServerName sn : getServersFromRack(rack)) { + serversToChooseFrom.add(StartcodeAgnosticServerName.valueOf(sn)); + } + + if (skipServerSet != null && skipServerSet.size() > 0) { + for (ServerName sn : skipServerSet) { + serversToChooseFrom.remove(StartcodeAgnosticServerName.valueOf(sn)); + } + // Do we have any servers left to choose from? + if (serversToChooseFrom.size() == 0) { + return null; + } + } + + ServerName randomServer = null; + int randomIndex = random.nextInt(serversToChooseFrom.size()); + int j = 0; + for (StartcodeAgnosticServerName sn : serversToChooseFrom) { + if (j == randomIndex) { + randomServer = sn; + break; + } + j++; + } + + if (randomServer != null) { + return ServerName.valueOf(randomServer.getHostAndPort(), randomServer.getStartcode()); + } else { + return null; + } + } + + private ServerName getOneRandomServer(String rack) throws IOException { + return this.getOneRandomServer(rack, null); + } + + protected String getOneRandomRack(Set<String> skipRackSet) throws IOException { + if (skipRackSet == null || uniqueRackList.size() <= skipRackSet.size()) { + throw new IOException("Cannot randomly pick another random server"); + } + + String randomRack; + do { + int randomIndex = random.nextInt(this.uniqueRackList.size()); + randomRack = this.uniqueRackList.get(randomIndex); + } while (skipRackSet.contains(randomRack)); + + return randomRack; + } + + public static String getFavoredNodesAsString(List<ServerName> nodes) { + StringBuffer strBuf = new StringBuffer(); + int i = 0; + for (ServerName node : nodes) { + strBuf.append(node.getHostAndPort()); + if (++i != nodes.size()) strBuf.append(";"); + } + return strBuf.toString(); + } + + /* + * Generates a missing favored node based on the input favored nodes. This helps to generate + * new FN when there is already 2 FN and we need a third one. For eg, while generating new FN + * for split daughters after inheriting 2 FN from the parent. If the cluster has only one rack + * it generates from the same rack. If the cluster has multiple racks, then it ensures the new + * FN respects the rack constraints similar to HDFS. For eg: if there are 3 FN, they will be + * spread across 2 racks. + */ + public ServerName generateMissingFavoredNode(List<ServerName> favoredNodes) throws IOException { + if (this.uniqueRackList.size() == 1) { + return generateMissingFavoredNodeSingleRack(favoredNodes, null); + } else { + return generateMissingFavoredNodeMultiRack(favoredNodes, null); + } + } + + public ServerName generateMissingFavoredNode(List<ServerName> favoredNodes, + List<ServerName> excludeNodes) throws IOException { + if (this.uniqueRackList.size() == 1) { + return generateMissingFavoredNodeSingleRack(favoredNodes, excludeNodes); + } else { + return generateMissingFavoredNodeMultiRack(favoredNodes, excludeNodes); + } + } + + /* + * Generate FN for a single rack scenario, don't generate from one of the excluded nodes. Helps + * when we would like to find a replacement node. + */ + private ServerName generateMissingFavoredNodeSingleRack(List<ServerName> favoredNodes, + List<ServerName> excludeNodes) throws IOException { + ServerName newServer = null; + Set<ServerName> excludeFNSet = Sets.newHashSet(favoredNodes); + if (excludeNodes != null && excludeNodes.size() > 0) { + excludeFNSet.addAll(excludeNodes); + } + if (favoredNodes.size() < FAVORED_NODES_NUM) { + newServer = this.getOneRandomServer(this.uniqueRackList.get(0), excludeFNSet); + } + return newServer; + } + + private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredNodes) + throws IOException { + return generateMissingFavoredNodeMultiRack(favoredNodes, null); + } + + /* + * Generates a missing FN based on the input favoredNodes and also the nodes to be skipped. + * + * Get the current layout of favored nodes arrangement and nodes to be excluded and get a + * random node that goes with HDFS block placement. Eg: If the existing nodes are on one rack, + * generate one from another rack. We exclude as much as possible so the random selection + * has more chance to generate a node within a few iterations, ideally 1. + */ + private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredNodes, + List<ServerName> excludeNodes) throws IOException { + + Set<String> racks = Sets.newHashSet(); + Map<String, Set<ServerName>> rackToFNMapping = new HashMap<>(); + + // Lets understand the current rack distribution of the FN + for (ServerName sn : favoredNodes) { + String rack = getRackOfServer(sn); + racks.add(rack); + + Set<ServerName> serversInRack = rackToFNMapping.get(rack); + if (serversInRack == null) { + serversInRack = Sets.newHashSet(); + rackToFNMapping.put(rack, serversInRack); + } + serversInRack.add(sn); + } + + // What racks should be skipped while getting a FN? + Set<String> skipRackSet = Sets.newHashSet(); + + /* + * If both the FN are from the same rack, then we don't want to generate another FN on the + * same rack. If that rack fails, the region would be unavailable. + */ + if (racks.size() == 1 && favoredNodes.size() > 1) { + skipRackSet.add(racks.iterator().next()); + } + + /* + * If there are no free nodes on the existing racks, we should skip those racks too. We can + * reduce the number of iterations for FN selection. + */ + for (String rack : racks) { + if (getServersFromRack(rack) != null && + rackToFNMapping.get(rack).size() == getServersFromRack(rack).size()) { + skipRackSet.add(rack); + } + } + + Set<ServerName> favoredNodeSet = Sets.newHashSet(favoredNodes); + if (excludeNodes != null && excludeNodes.size() > 0) { + favoredNodeSet.addAll(excludeNodes); + } + + /* + * Lets get a random rack by excluding skipRackSet and generate a random FN from that rack. + */ + int i = 0; + Set<String> randomRacks = Sets.newHashSet(); + ServerName newServer = null; + do { + String randomRack = this.getOneRandomRack(skipRackSet); + newServer = this.getOneRandomServer(randomRack, favoredNodeSet); + randomRacks.add(randomRack); + i++; + } while ((i < MAX_ATTEMPTS_FN_GENERATION) && (newServer == null)); + + if (newServer == null) { + if (LOG.isTraceEnabled()) { + LOG.trace(String.format("Unable to generate additional favored nodes for %s after " + + "considering racks %s and skip rack %s with a unique rack list of %s and rack " + + "to RS map of %s and RS to rack map of %s", + StringUtils.join(favoredNodes, ","), randomRacks, skipRackSet, uniqueRackList, + rackToRegionServerMap, regionServerToRackMap)); + } + throw new IOException(" Unable to generate additional favored nodes for " + + StringUtils.join(favoredNodes, ",")); + } + return newServer; + } + + /* + * Generate favored nodes for a region. + * + * Choose a random server as primary and then choose secondary and tertiary FN so its spread + * across two racks. + */ + List<ServerName> generateFavoredNodes(HRegionInfo hri) throws IOException { + + List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM); + ServerName primary = servers.get(random.nextInt(servers.size())); + favoredNodesForRegion.add(ServerName.valueOf(primary.getHostAndPort(), ServerName.NON_STARTCODE)); + + Map<HRegionInfo, ServerName> primaryRSMap = new HashMap<>(1); + primaryRSMap.put(hri, primary); + Map<HRegionInfo, ServerName[]> secondaryAndTertiaryRSMap = + placeSecondaryAndTertiaryRS(primaryRSMap); + ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(hri); + if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) { + for (ServerName sn : secondaryAndTertiaryNodes) { + favoredNodesForRegion.add(ServerName.valueOf(sn.getHostAndPort(), ServerName.NON_STARTCODE)); + } + return favoredNodesForRegion; + } else { + throw new HBaseIOException("Unable to generate secondary and tertiary favored nodes."); + } + } + + /* + * Get the rack of server from local mapping when present, saves lookup by the RackManager. + */ + private String getRackOfServer(ServerName sn) { + if (this.regionServerToRackMap.containsKey(sn.getHostname())) { + return this.regionServerToRackMap.get(sn.getHostname()); + } else { + String rack = this.rackManager.getRack(sn); + this.regionServerToRackMap.put(sn.getHostname(), rack); + return rack; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c1293cc9/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java new file mode 100644 index 0000000..99aeede --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodeLoadBalancer.java @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.favored; + +import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY; +import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY; +import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.*; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that + * assigns favored nodes for each region. There is a Primary RegionServer that hosts + * the region, and then there is Secondary and Tertiary RegionServers. Currently, the + * favored nodes information is used in creating HDFS files - the Primary RegionServer + * passes the primary, secondary, tertiary node addresses as hints to the + * DistributedFileSystem API for creating files on the filesystem. These nodes are + * treated as hints by the HDFS to place the blocks of the file. This alleviates the + * problem to do with reading from remote nodes (since we can make the Secondary + * RegionServer as the new Primary RegionServer) after a region is recovered. This + * should help provide consistent read latencies for the regions even when their + * primary region servers die. + * + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements FavoredNodesPromoter { + private static final Log LOG = LogFactory.getLog(FavoredNodeLoadBalancer.class); + + private RackManager rackManager; + private Configuration conf; + private FavoredNodesManager fnm; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public synchronized void initialize() throws HBaseIOException { + super.initialize(); + super.setConf(conf); + this.fnm = services.getFavoredNodesManager(); + this.rackManager = new RackManager(conf); + super.setConf(conf); + } + + @Override + public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) { + //TODO. Look at is whether Stochastic loadbalancer can be integrated with this + List<RegionPlan> plans = new ArrayList<RegionPlan>(); + //perform a scan of the meta to get the latest updates (if any) + SnapshotOfRegionAssignmentFromMeta snaphotOfRegionAssignment = + new SnapshotOfRegionAssignmentFromMeta(super.services.getConnection()); + try { + snaphotOfRegionAssignment.initialize(); + } catch (IOException ie) { + LOG.warn("Not running balancer since exception was thrown " + ie); + return plans; + } + Map<ServerName, ServerName> serverNameToServerNameWithoutCode = + new HashMap<ServerName, ServerName>(); + Map<ServerName, ServerName> serverNameWithoutCodeToServerName = + new HashMap<ServerName, ServerName>(); + ServerManager serverMgr = super.services.getServerManager(); + for (ServerName sn: serverMgr.getOnlineServersList()) { + ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE); + serverNameToServerNameWithoutCode.put(sn, s); + serverNameWithoutCodeToServerName.put(s, sn); + } + for (Map.Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) { + ServerName currentServer = entry.getKey(); + //get a server without the startcode for the currentServer + ServerName currentServerWithoutStartCode = ServerName.valueOf(currentServer.getHostname(), + currentServer.getPort(), ServerName.NON_STARTCODE); + List<HRegionInfo> list = entry.getValue(); + for (HRegionInfo region : list) { + if(region.getTable().isSystemTable()) { + continue; + } + List<ServerName> favoredNodes = fnm.getFavoredNodes(region); + if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) { + continue; //either favorednodes does not exist or we are already on the primary node + } + ServerName destination = null; + //check whether the primary is available + destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0)); + if (destination == null) { + //check whether the region is on secondary/tertiary + if (currentServerWithoutStartCode.equals(favoredNodes.get(1)) || + currentServerWithoutStartCode.equals(favoredNodes.get(2))) { + continue; + } + //the region is currently on none of the favored nodes + //get it on one of them if possible + ServerLoad l1 = super.services.getServerManager().getLoad( + serverNameWithoutCodeToServerName.get(favoredNodes.get(1))); + ServerLoad l2 = super.services.getServerManager().getLoad( + serverNameWithoutCodeToServerName.get(favoredNodes.get(2))); + if (l1 != null && l2 != null) { + if (l1.getLoad() > l2.getLoad()) { + destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2)); + } else { + destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1)); + } + } else if (l1 != null) { + destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1)); + } else if (l2 != null) { + destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2)); + } + } + + if (destination != null) { + RegionPlan plan = new RegionPlan(region, currentServer, destination); + plans.add(plan); + } + } + } + return plans; + } + + @Override + public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions, + List<ServerName> servers) { + Map<ServerName, List<HRegionInfo>> assignmentMap; + try { + FavoredNodeAssignmentHelper assignmentHelper = + new FavoredNodeAssignmentHelper(servers, rackManager); + assignmentHelper.initialize(); + if (!assignmentHelper.canPlaceFavoredNodes()) { + return super.roundRobinAssignment(regions, servers); + } + // Segregate the regions into two types: + // 1. The regions that have favored node assignment, and where at least + // one of the favored node is still alive. In this case, try to adhere + // to the current favored nodes assignment as much as possible - i.e., + // if the current primary is gone, then make the secondary or tertiary + // as the new host for the region (based on their current load). + // Note that we don't change the favored + // node assignments here (even though one or more favored node is currently + // down). It is up to the balanceCluster to do this hard work. The HDFS + // can handle the fact that some nodes in the favored nodes hint is down + // It'd allocate some other DNs. In combination with stale settings for HDFS, + // we should be just fine. + // 2. The regions that currently don't have favored node assignment. We will + // need to come up with favored nodes assignments for them. The corner case + // in (1) above is that all the nodes are unavailable and in that case, we + // will note that this region doesn't have favored nodes. + Pair<Map<ServerName,List<HRegionInfo>>, List<HRegionInfo>> segregatedRegions = + segregateRegionsAndAssignRegionsWithFavoredNodes(regions, servers); + Map<ServerName,List<HRegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst(); + List<HRegionInfo> regionsWithNoFavoredNodes = segregatedRegions.getSecond(); + assignmentMap = new HashMap<ServerName, List<HRegionInfo>>(); + roundRobinAssignmentImpl(assignmentHelper, assignmentMap, regionsWithNoFavoredNodes, + servers); + // merge the assignment maps + assignmentMap.putAll(regionsWithFavoredNodesMap); + } catch (Exception ex) { + LOG.warn("Encountered exception while doing favored-nodes assignment " + ex + + " Falling back to regular assignment"); + assignmentMap = super.roundRobinAssignment(regions, servers); + } + return assignmentMap; + } + + @Override + public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) { + try { + FavoredNodeAssignmentHelper assignmentHelper = + new FavoredNodeAssignmentHelper(servers, rackManager); + assignmentHelper.initialize(); + ServerName primary = super.randomAssignment(regionInfo, servers); + if (!assignmentHelper.canPlaceFavoredNodes()) { + return primary; + } + List<ServerName> favoredNodes = fnm.getFavoredNodes(regionInfo); + // check if we have a favored nodes mapping for this region and if so, return + // a server from the favored nodes list if the passed 'servers' contains this + // server as well (available servers, that is) + if (favoredNodes != null) { + for (ServerName s : favoredNodes) { + ServerName serverWithLegitStartCode = availableServersContains(servers, s); + if (serverWithLegitStartCode != null) { + return serverWithLegitStartCode; + } + } + } + List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1); + regions.add(regionInfo); + Map<HRegionInfo, ServerName> primaryRSMap = new HashMap<HRegionInfo, ServerName>(1); + primaryRSMap.put(regionInfo, primary); + assignSecondaryAndTertiaryNodesForRegion(assignmentHelper, regions, primaryRSMap); + return primary; + } catch (Exception ex) { + LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + ex + + " Falling back to regular assignment"); + return super.randomAssignment(regionInfo, servers); + } + } + + private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>> + segregateRegionsAndAssignRegionsWithFavoredNodes(List<HRegionInfo> regions, + List<ServerName> availableServers) { + Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes = + new HashMap<ServerName, List<HRegionInfo>>(regions.size() / 2); + List<HRegionInfo> regionsWithNoFavoredNodes = new ArrayList<HRegionInfo>(regions.size()/2); + for (HRegionInfo region : regions) { + List<ServerName> favoredNodes = fnm.getFavoredNodes(region); + ServerName primaryHost = null; + ServerName secondaryHost = null; + ServerName tertiaryHost = null; + if (favoredNodes != null) { + for (ServerName s : favoredNodes) { + ServerName serverWithLegitStartCode = availableServersContains(availableServers, s); + if (serverWithLegitStartCode != null) { + FavoredNodesPlan.Position position = + FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s); + if (Position.PRIMARY.equals(position)) { + primaryHost = serverWithLegitStartCode; + } else if (Position.SECONDARY.equals(position)) { + secondaryHost = serverWithLegitStartCode; + } else if (Position.TERTIARY.equals(position)) { + tertiaryHost = serverWithLegitStartCode; + } + } + } + assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region, + primaryHost, secondaryHost, tertiaryHost); + } + if (primaryHost == null && secondaryHost == null && tertiaryHost == null) { + //all favored nodes unavailable + regionsWithNoFavoredNodes.add(region); + } + } + return new Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>>( + assignmentMapForFavoredNodes, regionsWithNoFavoredNodes); + } + + // Do a check of the hostname and port and return the servername from the servers list + // that matched (the favoredNode will have a startcode of -1 but we want the real + // server with the legit startcode + private ServerName availableServersContains(List<ServerName> servers, ServerName favoredNode) { + for (ServerName server : servers) { + if (ServerName.isSameHostnameAndPort(favoredNode, server)) { + return server; + } + } + return null; + } + + private void assignRegionToAvailableFavoredNode(Map<ServerName, + List<HRegionInfo>> assignmentMapForFavoredNodes, HRegionInfo region, ServerName primaryHost, + ServerName secondaryHost, ServerName tertiaryHost) { + if (primaryHost != null) { + addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost); + } else if (secondaryHost != null && tertiaryHost != null) { + // assign the region to the one with a lower load + // (both have the desired hdfs blocks) + ServerName s; + ServerLoad tertiaryLoad = super.services.getServerManager().getLoad(tertiaryHost); + ServerLoad secondaryLoad = super.services.getServerManager().getLoad(secondaryHost); + if (secondaryLoad.getLoad() < tertiaryLoad.getLoad()) { + s = secondaryHost; + } else { + s = tertiaryHost; + } + addRegionToMap(assignmentMapForFavoredNodes, region, s); + } else if (secondaryHost != null) { + addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost); + } else if (tertiaryHost != null) { + addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost); + } + } + + private void addRegionToMap(Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes, + HRegionInfo region, ServerName host) { + List<HRegionInfo> regionsOnServer = null; + if ((regionsOnServer = assignmentMapForFavoredNodes.get(host)) == null) { + regionsOnServer = new ArrayList<HRegionInfo>(); + assignmentMapForFavoredNodes.put(host, regionsOnServer); + } + regionsOnServer.add(region); + } + + public synchronized List<ServerName> getFavoredNodes(HRegionInfo regionInfo) { + return this.fnm.getFavoredNodes(regionInfo); + } + + private void roundRobinAssignmentImpl(FavoredNodeAssignmentHelper assignmentHelper, + Map<ServerName, List<HRegionInfo>> assignmentMap, + List<HRegionInfo> regions, List<ServerName> servers) throws IOException { + Map<HRegionInfo, ServerName> primaryRSMap = new HashMap<HRegionInfo, ServerName>(); + // figure the primary RSs + assignmentHelper.placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions); + assignSecondaryAndTertiaryNodesForRegion(assignmentHelper, regions, primaryRSMap); + } + + private void assignSecondaryAndTertiaryNodesForRegion( + FavoredNodeAssignmentHelper assignmentHelper, + List<HRegionInfo> regions, Map<HRegionInfo, ServerName> primaryRSMap) throws IOException { + // figure the secondary and tertiary RSs + Map<HRegionInfo, ServerName[]> secondaryAndTertiaryRSMap = + assignmentHelper.placeSecondaryAndTertiaryRS(primaryRSMap); + + Map<HRegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap(); + // now record all the assignments so that we can serve queries later + for (HRegionInfo region : regions) { + // Store the favored nodes without startCode for the ServerName objects + // We don't care about the startcode; but only the hostname really + List<ServerName> favoredNodesForRegion = new ArrayList<ServerName>(3); + ServerName sn = primaryRSMap.get(region); + favoredNodesForRegion.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), + ServerName.NON_STARTCODE)); + ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region); + if (secondaryAndTertiaryNodes != null) { + favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(), + secondaryAndTertiaryNodes[0].getPort(), ServerName.NON_STARTCODE)); + favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(), + secondaryAndTertiaryNodes[1].getPort(), ServerName.NON_STARTCODE)); + } + regionFNMap.put(region, favoredNodesForRegion); + } + fnm.updateFavoredNodes(regionFNMap); + } + + /* + * Generate Favored Nodes for daughters during region split. + * + * If the parent does not have FN, regenerates them for the daughters. + * + * If the parent has FN, inherit two FN from parent for each daughter and generate the remaining. + * The primary FN for both the daughters should be the same as parent. Inherit the secondary + * FN from the parent but keep it different for each daughter. Choose the remaining FN + * randomly. This would give us better distribution over a period of time after enough splits. + */ + @Override + public void generateFavoredNodesForDaughter(List<ServerName> servers, HRegionInfo parent, + HRegionInfo regionA, HRegionInfo regionB) throws IOException { + + Map<HRegionInfo, List<ServerName>> result = new HashMap<>(); + FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager); + helper.initialize(); + + List<ServerName> parentFavoredNodes = getFavoredNodes(parent); + if (parentFavoredNodes == null) { + LOG.debug("Unable to find favored nodes for parent, " + parent + + " generating new favored nodes for daughter"); + result.put(regionA, helper.generateFavoredNodes(regionA)); + result.put(regionB, helper.generateFavoredNodes(regionB)); + + } else { + + // Lets get the primary and secondary from parent for regionA + Set<ServerName> regionAFN = + getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY); + result.put(regionA, Lists.newArrayList(regionAFN)); + + // Lets get the primary and tertiary from parent for regionB + Set<ServerName> regionBFN = + getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY); + result.put(regionB, Lists.newArrayList(regionBFN)); + } + + fnm.updateFavoredNodes(result); + } + + private Set<ServerName> getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper, + List<ServerName> parentFavoredNodes, Position primary, Position secondary) + throws IOException { + + Set<ServerName> daughterFN = Sets.newLinkedHashSet(); + if (parentFavoredNodes.size() >= primary.ordinal()) { + daughterFN.add(parentFavoredNodes.get(primary.ordinal())); + } + + if (parentFavoredNodes.size() >= secondary.ordinal()) { + daughterFN.add(parentFavoredNodes.get(secondary.ordinal())); + } + + while (daughterFN.size() < FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) { + ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN)); + daughterFN.add(newNode); + } + return daughterFN; + } + + /* + * Generate favored nodes for a region during merge. Choose the FN from one of the sources to + * keep it simple. + */ + @Override + public void generateFavoredNodesForMergedRegion(HRegionInfo merged, HRegionInfo regionA, + HRegionInfo regionB) throws IOException { + Map<HRegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap(); + regionFNMap.put(merged, getFavoredNodes(regionA)); + fnm.updateFavoredNodes(regionFNMap); + } + + @Override + public List<RegionPlan> balanceCluster(TableName tableName, + Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException { + return balanceCluster(clusterState); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c1293cc9/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java new file mode 100644 index 0000000..cfb9bef --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java @@ -0,0 +1,185 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.favored; + +import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY; +import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY; +import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * FavoredNodesManager is responsible for maintaining favored nodes info in internal cache and + * META table. Its the centralized store for all favored nodes information. All reads and updates + * should be done through this class. There should only be one instance of + * {@link FavoredNodesManager} in Master. {@link FavoredNodesPlan} and favored node information + * from {@link SnapshotOfRegionAssignmentFromMeta} should not be used outside this class (except + * for may be tools that only read or test cases). All other classes including Favored balancers + * and {@link FavoredNodeAssignmentHelper} should use {@link FavoredNodesManager} for any + * read/write/deletes to favored nodes. + */ +@InterfaceAudience.Private +public class FavoredNodesManager { + + private static final Log LOG = LogFactory.getLog(FavoredNodesManager.class); + + private FavoredNodesPlan globalFavoredNodesAssignmentPlan; + private Map<ServerName, List<HRegionInfo>> primaryRSToRegionMap; + private Map<ServerName, List<HRegionInfo>> secondaryRSToRegionMap; + private Map<ServerName, List<HRegionInfo>> teritiaryRSToRegionMap; + + private MasterServices masterServices; + + public FavoredNodesManager(MasterServices masterServices) { + this.masterServices = masterServices; + this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan(); + this.primaryRSToRegionMap = new HashMap<>(); + this.secondaryRSToRegionMap = new HashMap<>(); + this.teritiaryRSToRegionMap = new HashMap<>(); + } + + public void initialize(SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment) + throws HBaseIOException { + globalFavoredNodesAssignmentPlan = snapshotOfRegionAssignment.getExistingAssignmentPlan(); + primaryRSToRegionMap = snapshotOfRegionAssignment.getPrimaryToRegionInfoMap(); + secondaryRSToRegionMap = snapshotOfRegionAssignment.getSecondaryToRegionInfoMap(); + teritiaryRSToRegionMap = snapshotOfRegionAssignment.getTertiaryToRegionInfoMap(); + } + + public synchronized List<ServerName> getFavoredNodes(HRegionInfo regionInfo) { + return this.globalFavoredNodesAssignmentPlan.getFavoredNodes(regionInfo); + } + + public synchronized void updateFavoredNodes(Map<HRegionInfo, List<ServerName>> regionFNMap) + throws IOException { + + Map<HRegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<>(); + for (Map.Entry<HRegionInfo, List<ServerName>> entry : regionFNMap.entrySet()) { + HRegionInfo regionInfo = entry.getKey(); + List<ServerName> servers = entry.getValue(); + + /* + * None of the following error conditions should happen. If it does, there is an issue with + * favored nodes generation or the regions its called on. + */ + if (servers.size() != Sets.newHashSet(servers).size()) { + throw new IOException("Duplicates found: " + servers); + } + + if (regionInfo.isSystemTable()) { + throw new IOException("Can't update FN for system region: " + + regionInfo.getRegionNameAsString() + " with " + servers); + } + + if (servers.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) { + throw new IOException("At least " + FavoredNodeAssignmentHelper.FAVORED_NODES_NUM + + " favored nodes should be present for region : " + regionInfo.getEncodedName() + + " current FN servers:" + servers); + } + + List<ServerName> serversWithNoStartCodes = Lists.newArrayList(); + for (ServerName sn : servers) { + if (sn.getStartcode() == ServerName.NON_STARTCODE) { + serversWithNoStartCodes.add(sn); + } else { + serversWithNoStartCodes.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), + ServerName.NON_STARTCODE)); + } + } + regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes); + } + + // Lets do a bulk update to meta since that reduces the RPC's + FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo( + regionToFavoredNodes, + masterServices.getConnection()); + deleteFavoredNodesForRegions(regionToFavoredNodes.keySet()); + + for (Map.Entry<HRegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) { + HRegionInfo regionInfo = entry.getKey(); + List<ServerName> serversWithNoStartCodes = entry.getValue(); + globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(regionInfo, serversWithNoStartCodes); + addToReplicaLoad(regionInfo, serversWithNoStartCodes); + } + } + + private synchronized void addToReplicaLoad(HRegionInfo hri, List<ServerName> servers) { + ServerName serverToUse = ServerName.valueOf(servers.get(PRIMARY.ordinal()).getHostAndPort(), + ServerName.NON_STARTCODE); + List<HRegionInfo> regionList = primaryRSToRegionMap.get(serverToUse); + if (regionList == null) { + regionList = new ArrayList<>(); + } + regionList.add(hri); + primaryRSToRegionMap.put(serverToUse, regionList); + + serverToUse = ServerName + .valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), ServerName.NON_STARTCODE); + regionList = secondaryRSToRegionMap.get(serverToUse); + if (regionList == null) { + regionList = new ArrayList<>(); + } + regionList.add(hri); + secondaryRSToRegionMap.put(serverToUse, regionList); + + serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getHostAndPort(), + ServerName.NON_STARTCODE); + regionList = teritiaryRSToRegionMap.get(serverToUse); + if (regionList == null) { + regionList = new ArrayList<>(); + } + regionList.add(hri); + teritiaryRSToRegionMap.put(serverToUse, regionList); + } + + private synchronized void deleteFavoredNodesForRegions(Collection<HRegionInfo> regionInfoList) { + for (HRegionInfo hri : regionInfoList) { + List<ServerName> favNodes = getFavoredNodes(hri); + if (favNodes != null) { + if (primaryRSToRegionMap.containsKey(favNodes.get(PRIMARY.ordinal()))) { + primaryRSToRegionMap.get(favNodes.get(PRIMARY.ordinal())).remove(hri); + } + if (secondaryRSToRegionMap.containsKey(favNodes.get(SECONDARY.ordinal()))) { + secondaryRSToRegionMap.get(favNodes.get(SECONDARY.ordinal())).remove(hri); + } + if (teritiaryRSToRegionMap.containsKey(favNodes.get(TERTIARY.ordinal()))) { + teritiaryRSToRegionMap.get(favNodes.get(TERTIARY.ordinal())).remove(hri); + } + globalFavoredNodesAssignmentPlan.removeFavoredNodes(hri); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c1293cc9/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPlan.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPlan.java new file mode 100644 index 0000000..5629f35 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPlan.java @@ -0,0 +1,144 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.favored; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; + +/** + * This class contains the mapping information between each region name and + * its favored region server list. Used by {@link FavoredNodeLoadBalancer} set + * of classes and from unit tests (hence the class is public) + * + * All the access to this class is thread-safe. + */ +@InterfaceAudience.Private +public class FavoredNodesPlan { + + /** the map between each region name and its favored region server list */ + private Map<String, List<ServerName>> favoredNodesMap; + + public static enum Position { + PRIMARY, + SECONDARY, + TERTIARY + } + + public FavoredNodesPlan() { + favoredNodesMap = new ConcurrentHashMap<String, List<ServerName>>(); + } + + /** + * Update an assignment to the plan + * @param region + * @param servers + */ + public void updateFavoredNodesMap(HRegionInfo region, List<ServerName> servers) { + if (region == null || servers == null || servers.size() == 0) { + return; + } + this.favoredNodesMap.put(region.getRegionNameAsString(), servers); + } + + /** + * Remove a favored node assignment + * @param region region + * @return the list of favored region server for this region based on the plan + */ + public List<ServerName> removeFavoredNodes(HRegionInfo region) { + return favoredNodesMap.remove(region.getRegionNameAsString()); + } + + /** + * @param region + * @return the list of favored region server for this region based on the plan + */ + public List<ServerName> getFavoredNodes(HRegionInfo region) { + return favoredNodesMap.get(region.getRegionNameAsString()); + } + + /** + * Return the position of the server in the favoredNodes list. Assumes the + * favoredNodes list is of size 3. + * @param favoredNodes + * @param server + * @return position + */ + public static Position getFavoredServerPosition( + List<ServerName> favoredNodes, ServerName server) { + if (favoredNodes == null || server == null || + favoredNodes.size() != FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) { + return null; + } + for (Position p : Position.values()) { + if (ServerName.isSameHostnameAndPort(favoredNodes.get(p.ordinal()),server)) { + return p; + } + } + return null; + } + + /** + * @return the mapping between each region to its favored region server list + */ + public Map<String, List<ServerName>> getAssignmentMap() { + return favoredNodesMap; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null) { + return false; + } + if (getClass() != o.getClass()) { + return false; + } + // To compare the map from objec o is identical to current assignment map. + Map<String, List<ServerName>> comparedMap = ((FavoredNodesPlan)o).getAssignmentMap(); + + // compare the size + if (comparedMap.size() != this.favoredNodesMap.size()) + return false; + + // compare each element in the assignment map + for (Map.Entry<String, List<ServerName>> entry : + comparedMap.entrySet()) { + List<ServerName> serverList = this.favoredNodesMap.get(entry.getKey()); + if (serverList == null && entry.getValue() != null) { + return false; + } else if (serverList != null && !serverList.equals(entry.getValue())) { + return false; + } + } + return true; + } + + @Override + public int hashCode() { + return favoredNodesMap.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c1293cc9/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java new file mode 100644 index 0000000..90f29db --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesPromoter.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.favored; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; + +@InterfaceAudience.Private +public interface FavoredNodesPromoter { + + void generateFavoredNodesForDaughter(List<ServerName> servers, + HRegionInfo parent, HRegionInfo hriA, HRegionInfo hriB) throws IOException; + + void generateFavoredNodesForMergedRegion(HRegionInfo merged, HRegionInfo hriA, + HRegionInfo hriB) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c1293cc9/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/StartcodeAgnosticServerName.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/StartcodeAgnosticServerName.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/StartcodeAgnosticServerName.java new file mode 100644 index 0000000..095ee29 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/favored/StartcodeAgnosticServerName.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.favored; + +import com.google.common.net.HostAndPort; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Addressing; + +/** + * This class differs from ServerName in that start code is always ignored. This is because + * start code, ServerName.NON_STARTCODE is used to persist favored nodes and keeping this separate + * from {@link ServerName} is much cleaner. This should only be used by Favored node specific + * classes and should not be used outside favored nodes. + */ +@InterfaceAudience.Private +class StartcodeAgnosticServerName extends ServerName { + + public StartcodeAgnosticServerName(final String hostname, final int port, long startcode) { + super(hostname, port, startcode); + } + + public static StartcodeAgnosticServerName valueOf(final ServerName serverName) { + return new StartcodeAgnosticServerName(serverName.getHostname(), serverName.getPort(), + serverName.getStartcode()); + } + + public static StartcodeAgnosticServerName valueOf(final String hostnameAndPort, long startcode) { + return new StartcodeAgnosticServerName(Addressing.parseHostname(hostnameAndPort), + Addressing.parsePort(hostnameAndPort), startcode); + } + + public static StartcodeAgnosticServerName valueOf(final HostAndPort hostnameAndPort, long startcode) { + return new StartcodeAgnosticServerName(hostnameAndPort.getHostText(), + hostnameAndPort.getPort(), startcode); + } + + @Override + public int compareTo(ServerName other) { + int compare = this.getHostname().compareTo(other.getHostname()); + if (compare != 0) return compare; + compare = this.getPort() - other.getPort(); + if (compare != 0) return compare; + return 0; + } + + @Override + public int hashCode() { + return getHostAndPort().hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c1293cc9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index a8061a1..3ab4678 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -69,12 +69,11 @@ import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; import org.apache.hadoop.hbase.ipc.FailedServerException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState.State; -import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; -import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; @@ -229,10 +228,6 @@ public class AssignmentManager { this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = server.getConfiguration(); - // Only read favored nodes if using the favored nodes load balancer. - this.shouldAssignRegionsWithFavoredNodes = conf.getClass( - HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals( - FavoredNodeLoadBalancer.class); this.tableStateManager = tableStateManager; @@ -242,6 +237,8 @@ public class AssignmentManager { this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( "hbase.meta.assignment.retry.sleeptime", 1000l); this.balancer = balancer; + // Only read favored nodes if using the favored nodes load balancer. + this.shouldAssignRegionsWithFavoredNodes = this.balancer instanceof FavoredNodesPromoter; int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( @@ -629,23 +626,21 @@ public class AssignmentManager { } } - // TODO: processFavoredNodes might throw an exception, for e.g., if the - // meta could not be contacted/updated. We need to see how seriously to treat - // this problem as. Should we fail the current assignment. We should be able - // to recover from this problem eventually (if the meta couldn't be updated - // things should work normally and eventually get fixed up). - void processFavoredNodes(List<HRegionInfo> regions) throws IOException { - if (!shouldAssignRegionsWithFavoredNodes) return; - // The AM gets the favored nodes info for each region and updates the meta - // table with that info - Map<HRegionInfo, List<ServerName>> regionToFavoredNodes = - new HashMap<HRegionInfo, List<ServerName>>(); - for (HRegionInfo region : regions) { - regionToFavoredNodes.put(region, - ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region)); + void processFavoredNodesForDaughters(HRegionInfo parent, + HRegionInfo regionA, HRegionInfo regionB) throws IOException { + if (shouldAssignRegionsWithFavoredNodes) { + List<ServerName> onlineServers = this.serverManager.getOnlineServersList(); + ((FavoredNodesPromoter) this.balancer). + generateFavoredNodesForDaughter(onlineServers, parent, regionA, regionB); + } + } + + void processFavoredNodesForMerge(HRegionInfo merged, HRegionInfo regionA, HRegionInfo regionB) + throws IOException { + if (shouldAssignRegionsWithFavoredNodes) { + ((FavoredNodesPromoter)this.balancer). + generateFavoredNodesForMergedRegion(merged, regionA, regionB); } - FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, - this.server.getConnection()); } /** @@ -806,7 +801,7 @@ public class AssignmentManager { region, State.PENDING_OPEN, destination); List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; if (this.shouldAssignRegionsWithFavoredNodes) { - favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region); + favoredNodes = server.getFavoredNodesManager().getFavoredNodes(region); } regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>( region, favoredNodes)); @@ -1114,7 +1109,7 @@ public class AssignmentManager { try { List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; if (this.shouldAssignRegionsWithFavoredNodes) { - favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region); + favoredNodes = server.getFavoredNodesManager().getFavoredNodes(region); } serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes); return; // we're done @@ -1299,15 +1294,6 @@ public class AssignmentManager { LOG.warn("Failed to create new plan.",ex); return null; } - if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) { - List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1); - regions.add(region); - try { - processFavoredNodes(regions); - } catch (IOException ie) { - LOG.warn("Ignoring exception in processFavoredNodes " + ie); - } - } this.regionPlans.put(encodedName, randomPlan); } } @@ -1579,7 +1565,6 @@ public class AssignmentManager { processBogusAssignments(bulkPlan); - processFavoredNodes(regions); assign(regions.size(), servers.size(), "round-robin=true", bulkPlan); } @@ -1869,7 +1854,8 @@ public class AssignmentManager { } List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; if (shouldAssignRegionsWithFavoredNodes) { - favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri); + favoredNodes = + ((MasterServices)server).getFavoredNodesManager().getFavoredNodes(hri); } serverManager.sendRegionOpen(serverName, hri, favoredNodes); return; // we're done @@ -2424,6 +2410,7 @@ public class AssignmentManager { try { regionStates.splitRegion(hri, a, b, serverName); + processFavoredNodesForDaughters(hri, a ,b); } catch (IOException ioe) { LOG.info("Failed to record split region " + hri.getShortNameToLog()); return "Failed to record the splitting in meta"; @@ -2692,6 +2679,26 @@ public class AssignmentManager { regionOffline(b, State.MERGED); regionOnline(hri, serverName, 1); + try { + if (this.shouldAssignRegionsWithFavoredNodes) { + processFavoredNodesForMerge(hri, a, b); + /* + * This can be removed once HBASE-16119 (Procedure v2 Merge) is implemented and AM force + * assigns the merged region on the same region server. FavoredNodes for the region would + * be passed along with OpenRegionRequest and hence the following would become redundant. + */ + List<ServerName> favoredNodes = server.getFavoredNodesManager().getFavoredNodes(hri); + if (favoredNodes != null) { + Map<HRegionInfo, List<ServerName>> regionFNMap = new HashMap<>(1); + regionFNMap.put(hri, favoredNodes); + server.getServerManager().sendFavoredNodes(serverName, regionFNMap); + } + } + } catch (IOException e) { + LOG.error("Error while processing favored nodes after merge.", e); + return StringUtils.stringifyException(e); + } + // User could disable the table before master knows the new region. if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED, TableState.State.DISABLING)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/c1293cc9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentVerificationReport.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentVerificationReport.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentVerificationReport.java index 1ea57b4..d290f26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentVerificationReport.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentVerificationReport.java @@ -29,11 +29,11 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper; +import org.apache.hadoop.hbase.favored.FavoredNodesPlan; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; -import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan; /** * Helper class that is used by {@link RegionPlacementMaintainer} to print * information for favored nodes