Repository: hbase Updated Branches: refs/heads/master 07086036a -> 39d43ab77
HBASE-16821 Enhance LoadIncrementalHFiles API to convey missing hfiles if any Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/39d43ab7 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/39d43ab7 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/39d43ab7 Branch: refs/heads/master Commit: 39d43ab779a90f28273426ee2887daacaa6b1f48 Parents: 0708603 Author: tedyu <yuzhih...@gmail.com> Authored: Fri Oct 14 09:07:38 2016 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Fri Oct 14 09:07:38 2016 -0700 ---------------------------------------------------------------------- .../hbase/mapreduce/LoadIncrementalHFiles.java | 83 +++++++++++++------- .../mapreduce/TestLoadIncrementalHFiles.java | 7 +- .../TestLoadIncrementalHFilesSplitRecovery.java | 20 ++--- 3 files changed, 70 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/39d43ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index ccf44da..3647637 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -362,9 +362,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param regionLocator region locator * @param silence true to ignore unmatched column families * @param copyFile always copy hfiles if true + * @return List of filenames which were not found * @throws TableNotFoundException if table does not yet exist */ - public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table, + public List<String> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table, RegionLocator regionLocator, boolean silence, boolean copyFile) throws TableNotFoundException, IOException { if (!admin.isTableAvailable(regionLocator.getName())) { @@ -379,7 +380,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { prepareHFileQueue(map, table, queue, silence); if (queue.isEmpty()) { LOG.warn("Bulk load operation did not get any files to load"); - return; + return null; } pool = createExecutorService(); secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); @@ -389,7 +390,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { break; } } - performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); + return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); } finally { cleanup(admin, queue, pool, secureClient); } @@ -448,7 +449,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } - void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, + List<String> performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool, SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { int count = 0; @@ -463,6 +464,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // fs is the source filesystem fsDelegationToken.acquireDelegationToken(fs); bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); + Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = null; // Assumes that region splits can happen while this occurs. while (!queue.isEmpty()) { @@ -482,8 +484,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { count++; // Using ByteBuffer for byte[] equality semantics - Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table, - pool, queue, startEndKeys); + pair = groupOrSplitPhase(table, pool, queue, startEndKeys); + Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { // Error is logged inside checkHFilesCountPerRegionPerFamily. @@ -502,6 +504,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throw new RuntimeException("Bulk load aborted with some files not yet loaded." + "Please check log for more details."); } + if (pair == null) return null; + return pair.getSecond(); } /** @@ -625,7 +629,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { pool = createExecutorService(); Multimap<ByteBuffer, LoadQueueItem> regionGroups = - groupOrSplitPhase(table, pool, queue, startEndKeys); + groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile); } finally { if (pool != null) { @@ -709,25 +713,34 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } /** - * @return A map that groups LQI by likely bulk load region targets. + * @param table the table to load into + * @param pool the ExecutorService + * @param queue the queue for LoadQueueItem + * @param startEndKeys start and end keys + * @return A map that groups LQI by likely bulk load region targets and List of missing hfiles. */ - private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table, - ExecutorService pool, Deque<LoadQueueItem> queue, + private Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> groupOrSplitPhase( + final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { // <region start key, LQI> need synchronized only within this scope of this // phase because of the puts that happen in futures. Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); + List<String> missingHFiles = new ArrayList<>(); + Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = new Pair<>(regionGroups, + missingHFiles); // drain LQIs and figure out bulk load groups - Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<>(); + Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); while (!queue.isEmpty()) { final LoadQueueItem item = queue.remove(); - final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { + final Callable<Pair<List<LoadQueueItem>, String>> call = + new Callable<Pair<List<LoadQueueItem>, String>>() { @Override - public List<LoadQueueItem> call() throws Exception { - List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys); + public Pair<List<LoadQueueItem>, String> call() throws Exception { + Pair<List<LoadQueueItem>, String> splits = groupOrSplit(regionGroups, item, table, + startEndKeys); return splits; } }; @@ -735,11 +748,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } // get all the results. All grouping and splitting must finish before // we can attempt the atomic loads. - for (Future<List<LoadQueueItem>> lqis : splittingFutures) { + for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { try { - List<LoadQueueItem> splits = lqis.get(); + Pair<List<LoadQueueItem>, String> splits = lqis.get(); if (splits != null) { - queue.addAll(splits); + if (splits.getFirst() != null) { + queue.addAll(splits.getFirst()); + } else { + missingHFiles.add(splits.getSecond()); + } } } catch (ExecutionException e1) { Throwable t = e1.getCause(); @@ -754,7 +771,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throw (InterruptedIOException)new InterruptedIOException().initCause(e1); } } - return regionGroups; + return pair; } // unique file name for the table @@ -817,17 +834,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * protected for testing * @throws IOException if an IO failure is encountered */ - protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, - final LoadQueueItem item, final Table table, - final Pair<byte[][], byte[][]> startEndKeys) - throws IOException { + protected Pair<List<LoadQueueItem>, String> groupOrSplit( + Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, + final Pair<byte[][], byte[][]> startEndKeys) throws IOException { final Path hfilePath = item.hfilePath; // fs is the source filesystem if (fs == null) { fs = hfilePath.getFileSystem(getConf()); } - HFile.Reader hfr = HFile.createReader(fs, hfilePath, - new CacheConfig(getConf()), getConf()); + HFile.Reader hfr = null; + try { + hfr = HFile.createReader(fs, hfilePath, + new CacheConfig(getConf()), getConf()); + } catch (FileNotFoundException fnfe) { + LOG.debug("encountered", fnfe); + return new Pair<>(null, hfilePath.getName()); + } final byte[] first, last; try { hfr.loadFileInfo(); @@ -890,7 +912,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { List<LoadQueueItem> lqis = splitStoreFile(item, table, startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]); - return lqis; + return new Pair<>(lqis, null); } // group regions. @@ -1171,7 +1193,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("Table "+ tableName +" is available!!"); } - public int run(String dirPath, Map<byte[], List<Path>> map, TableName tableName) throws Exception{ + public List<String> run(String dirPath, Map<byte[], List<Path>> map, TableName tableName) throws Exception{ initialize(); try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { @@ -1197,13 +1219,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, "")); if (dirPath != null) { doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles); + return null; } else { - doBulkLoad(map, admin, table, locator, silence, copyFiles); + return doBulkLoad(map, admin, table, locator, silence, copyFiles); } } } - - return 0; } @Override @@ -1215,7 +1236,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String dirPath = args[0]; TableName tableName = TableName.valueOf(args[1]); - return run(dirPath, null, tableName); + List<String> missingHFiles = run(dirPath, null, tableName); + if (missingHFiles == null) return 0; + return -1; } public static void main(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/hbase/blob/39d43ab7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 88b9247..fe7abcd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -323,12 +323,14 @@ public class TestLoadIncrementalHFiles { map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR); map.put(FAMILY, list); } + Path last = null; for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; Path path = new Path(familyDir, "hfile_" + hfileIdx++); HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000); if (useMap) { + last = path; list.add(path); } } @@ -346,7 +348,10 @@ public class TestLoadIncrementalHFiles { LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); String [] args= {dir.toString(), tableName.toString()}; if (useMap) { - loader.run(null, map, tableName); + fs.delete(last); + List<String> missingHFiles = loader.run(null, map, tableName); + expectedRows -= 1000; + assertTrue(missingHFiles.contains(last.getName())); } else { loader.run(args); } http://git-wip-us.apache.org/repos/asf/hbase/blob/39d43ab7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 2060726..a1ed832 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -404,13 +404,14 @@ public class TestLoadIncrementalHFilesSplitRecovery { LoadIncrementalHFiles lih = new LoadIncrementalHFiles( util.getConfiguration()) { @Override - protected List<LoadQueueItem> groupOrSplit( + protected Pair<List<LoadQueueItem>, String> groupOrSplit( Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { - List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); - if (lqis != null) { - countedLqis.addAndGet(lqis.size()); + Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable, + startEndKeys); + if (lqis != null && lqis.getFirst() != null) { + countedLqis.addAndGet(lqis.getFirst().size()); } return lqis; } @@ -479,7 +480,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { int i = 0; @Override - protected List<LoadQueueItem> groupOrSplit( + protected Pair<List<LoadQueueItem>, String> groupOrSplit( Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { @@ -521,13 +522,14 @@ public class TestLoadIncrementalHFilesSplitRecovery { LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { @Override - protected List<LoadQueueItem> groupOrSplit( + protected Pair<List<LoadQueueItem>, String> groupOrSplit( Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { - List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); - if (lqis != null) { - countedLqis.addAndGet(lqis.size()); + Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable, + startEndKeys); + if (lqis != null && lqis.getFirst() != null) { + countedLqis.addAndGet(lqis.getFirst().size()); } return lqis; }