Repository: hbase Updated Branches: refs/heads/branch-1.3 446a21fed -> 693b51d81
HBASE-17060 backport HBASE-16570 (Compute region locality in parallel at startup) to 1.3.1 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/693b51d8 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/693b51d8 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/693b51d8 Branch: refs/heads/branch-1.3 Commit: 693b51d81af0c446b305af69fe130faee07581a6 Parents: 446a21f Author: Yu Li <l...@apache.org> Authored: Tue Mar 21 15:14:31 2017 +0800 Committer: Yu Li <l...@apache.org> Committed: Tue Mar 21 15:14:31 2017 +0800 ---------------------------------------------------------------------- .../hbase/master/balancer/BaseLoadBalancer.java | 11 +++-- .../master/balancer/RegionLocationFinder.java | 47 ++++++++++++++++++-- .../balancer/TestRegionLocationFinder.java | 21 +++++++++ 3 files changed, 71 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/693b51d8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index c2529a8..2df4fbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -1231,7 +1231,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return assignments; } - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); List<HRegionInfo> unassignedRegions = new ArrayList<HRegionInfo>(); roundRobinAssignment(cluster, regions, unassignedRegions, @@ -1278,7 +1278,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } protected Cluster createCluster(List<ServerName> servers, - Collection<HRegionInfo> regions) { + Collection<HRegionInfo> regions, boolean forceRefresh) { + if (forceRefresh) { + regionFinder.refreshAndWait(regions); + } // Get the snapshot of the current assignments for the regions in question, and then create // a cluster out of it. Note that we might have replicas already assigned to some servers // earlier. So we want to get the snapshot to see those assignments, but this will only contain @@ -1352,7 +1355,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } List<HRegionInfo> regions = Lists.newArrayList(regionInfo); - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); return randomAssignment(cluster, regionInfo, servers); } @@ -1427,7 +1430,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int numRandomAssignments = 0; int numRetainedAssigments = 0; - Cluster cluster = createCluster(servers, regions.keySet()); + Cluster cluster = createCluster(servers, regions.keySet(), true); for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) { HRegionInfo region = entry.getKey(); http://git-wip-us.apache.org/repos/asf/hbase/blob/693b51d8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index a6724ee..6c5cb19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -21,6 +21,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -63,11 +64,13 @@ import java.util.concurrent.TimeUnit; class RegionLocationFinder { private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class); private static final long CACHE_TIME = 240 * 60 * 1000; + private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution(); private Configuration conf; private volatile ClusterStatus status; private MasterServices services; private final ListeningExecutorService executor; - private long lastFullRefresh = 0; + // Do not scheduleFullRefresh at master startup + private long lastFullRefresh = EnvironmentEdgeManager.currentTime(); private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader = new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() { @@ -165,8 +168,7 @@ class RegionLocationFinder { } protected List<ServerName> getTopBlockLocations(HRegionInfo region) { - HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region); - List<String> topHosts = blocksDistribution.getTopHosts(); + List<String> topHosts = getBlockDistribution(region).getTopHosts(); return mapHostNameToServerName(topHosts); } @@ -208,7 +210,7 @@ class RegionLocationFinder { + region.getEncodedName(), ioe); } - return new HDFSBlocksDistribution(); + return EMPTY_BLOCK_DISTRIBUTION; } /** @@ -295,4 +297,41 @@ class RegionLocationFinder { return blockDistbn; } } + + private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution( + HRegionInfo hri) { + try { + return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); + } catch (Exception e) { + return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION); + } + } + + public void refreshAndWait(Collection<HRegionInfo> hris) { + ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures = + new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(hris.size()); + for (HRegionInfo hregionInfo : hris) { + regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo)); + } + int index = 0; + for (HRegionInfo hregionInfo : hris) { + ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures + .get(index); + try { + cache.put(hregionInfo, future.get()); + } catch (InterruptedException ite) { + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + LOG.debug( + "ExecutionException during HDFSBlocksDistribution computation. for region = " + + hregionInfo.getEncodedName(), ee); + } + index++; + } + } + + // For test + LoadingCache<HRegionInfo, HDFSBlocksDistribution> getCache() { + return cache; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/693b51d8/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java index bdbdc9f..2585a87 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -141,4 +142,24 @@ public class TestRegionLocationFinder { } } } + + @Test + public void testRefreshAndWait() throws Exception { + finder.getCache().invalidateAll(); + for (int i = 0; i < ServerNum; i++) { + HRegionServer server = cluster.getRegionServer(i); + List<Region> regions = server.getOnlineRegions(tableName); + if (regions.size() <= 0) { + continue; + } + List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>(regions.size()); + for (Region region : regions) { + regionInfos.add(region.getRegionInfo()); + } + finder.refreshAndWait(regionInfos); + for (HRegionInfo regionInfo : regionInfos) { + assertNotNull(finder.getCache().getIfPresent(regionInfo)); + } + } + } }