HBASE-16646 Enhance LoadIncrementalHFiles API to accept store file paths as 
input


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/348eb283
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/348eb283
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/348eb283

Branch: refs/heads/hbase-14439
Commit: 348eb2834a2a2d4b12e3dcf435b0fb2c47a85237
Parents: b2eac0d
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Sep 20 07:42:01 2016 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Tue Sep 20 07:42:01 2016 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 150 ++++++++++++++-----
 .../mapreduce/TestLoadIncrementalHFiles.java    |  60 ++++++--
 2 files changed, 159 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/348eb283/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 6978e23..f775b82 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -280,12 +280,24 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     }
   }
 
+  /*
+   * Populate the Queue with given HFiles
+   */
+  private void populateLoadQueue(final Deque<LoadQueueItem> ret,
+      Map<byte[], List<Path>> map) throws IOException {
+    for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
+      for (Path p : entry.getValue()) {
+        ret.add(new LoadQueueItem(entry.getKey(), p));
+      }
+    }
+  }
+
   /**
    * Walk the given directory for all HFiles, and return a Queue
    * containing all such files.
    */
   private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path 
hfofDir,
-    final boolean validateHFile) throws IOException {
+      final boolean validateHFile) throws IOException {
     fs = hfofDir.getFileSystem(getConf());
     visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
       @Override
@@ -325,6 +337,33 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
    * Perform a bulk load of the given directory into the given
    * pre-existing table.  This method is not threadsafe.
    *
+   * @param map map of family to List of hfiles
+   * @param admin the Admin
+   * @param table the table to load into
+   * @param regionLocator region locator
+   * @param silence true to ignore unmatched column families
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table 
table,
+          RegionLocator regionLocator, boolean silence) throws 
TableNotFoundException, IOException {
+    if (!admin.isTableAvailable(regionLocator.getName())) {
+      throw new TableNotFoundException("Table " + table.getName() + " is not 
currently available.");
+    }
+    // LQI queue does not need to be threadsafe -- all operations on this queue
+    // happen in this thread
+    Deque<LoadQueueItem> queue = new LinkedList<>();
+    prepareHFileQueue(map, table, queue, silence);
+    if (queue.isEmpty()) {
+      LOG.warn("Bulk load operation did not get any files to load");
+      return;
+    }
+    performBulkLoad(admin, table, regionLocator, queue);
+  }
+
+  /**
+   * Perform a bulk load of the given directory into the given
+   * pre-existing table.  This method is not threadsafe.
+   *
    * @param hfofDir the directory that was provided as the output path
    *   of a job using HFileOutputFormat
    * @param admin the Admin
@@ -335,41 +374,44 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
    */
   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
       RegionLocator regionLocator, boolean silence) throws 
TableNotFoundException, IOException {
-
     if (!admin.isTableAvailable(regionLocator.getName())) {
       throw new TableNotFoundException("Table " + table.getName() + " is not 
currently available.");
     }
 
-    ExecutorService pool = createExecutorService();
-
+    /*
+     * Checking hfile format is a time-consuming operation, we should have an 
option to skip
+     * this step when bulkloading millions of HFiles. See HBASE-13985.
+     */
+    boolean validateHFile = 
getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
+    if (!validateHFile) {
+      LOG.warn("You are skipping HFiles validation, it might cause some data 
loss if files " +
+          "are not correct. If you fail to read data from your table after 
using this " +
+          "option, consider removing the files and bulkload again without this 
option. " +
+          "See HBASE-13985");
+    }
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
     Deque<LoadQueueItem> queue = new LinkedList<>();
+    prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
+
+    if (queue.isEmpty()) {
+      LOG.warn("Bulk load operation did not find any files to load in " +
+          "directory " + hfofDir != null ? hfofDir.toUri() : "" + ".  Does it 
contain files in " +
+          "subdirectories that correspond to column family names?");
+      return;
+    }
+    performBulkLoad(admin, table, regionLocator, queue);
+  }
+
+  void performBulkLoad(final Admin admin, Table table, RegionLocator 
regionLocator,
+      Deque<LoadQueueItem> queue) throws IOException {
+    ExecutorService pool = createExecutorService();
+
     SecureBulkLoadClient secureClient =  new 
SecureBulkLoadClient(table.getConfiguration(), table);
 
     try {
-      /*
-       * Checking hfile format is a time-consuming operation, we should have 
an option to skip
-       * this step when bulkloading millions of HFiles. See HBASE-13985.
-       */
-      boolean validateHFile = 
getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
-      if(!validateHFile) {
-       LOG.warn("You are skipping HFiles validation, it might cause some data 
loss if files " +
-           "are not correct. If you fail to read data from your table after 
using this " +
-           "option, consider removing the files and bulkload again without 
this option. " +
-           "See HBASE-13985");
-      }
-      prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
-
       int count = 0;
 
-      if (queue.isEmpty()) {
-        LOG.warn("Bulk load operation did not find any files to load in " +
-            "directory " + hfofDir.toUri() + ".  Does it contain files in " +
-            "subdirectories that correspond to column family names?");
-        return;
-      }
-
       if(isSecureBulkLoadEndpointAvailable()) {
         LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in 
future releases.");
         LOG.warn("Secure bulk load has been integrated into HBase core.");
@@ -421,7 +463,7 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
         secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
       }
       pool.shutdown();
-      if (queue != null && !queue.isEmpty()) {
+      if (!queue.isEmpty()) {
         StringBuilder err = new StringBuilder();
         err.append("-------------------------------------------------\n");
         err.append("Bulk load aborted with some files not yet loaded:\n");
@@ -433,7 +475,7 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
       }
     }
 
-    if (queue != null && !queue.isEmpty()) {
+    if (!queue.isEmpty()) {
       throw new RuntimeException("Bulk load aborted with some files not yet 
loaded."
         + "Please check log for more details.");
     }
@@ -465,12 +507,28 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
    * @param silence  true to ignore unmatched column families
    * @throws IOException If any I/O or network error occurred
    */
-  public void prepareHFileQueue(Path hfilesDir, Table table, 
Deque<LoadQueueItem> queue,
-      boolean validateHFile, boolean silence) throws IOException {
+  public void prepareHFileQueue(Path hfilesDir, Table table,
+      Deque<LoadQueueItem> queue, boolean validateHFile, boolean silence) 
throws IOException {
     discoverLoadQueue(queue, hfilesDir, validateHFile);
     validateFamiliesInHFiles(table, queue, silence);
   }
 
+  /**
+   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles 
contained in the
+   * passed directory and validates whether the prepared queue has all the 
valid table column
+   * families in it.
+   * @param map map of family to List of hfiles
+   * @param table table to which hfiles should be loaded
+   * @param queue queue which needs to be loaded into the table
+   * @param silence  true to ignore unmatched column families
+   * @throws IOException If any I/O or network error occurred
+   */
+  public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
+      Deque<LoadQueueItem> queue, boolean silence) throws IOException {
+    populateLoadQueue(queue, map);
+    validateFamiliesInHFiles(table, queue, silence);
+  }
+
   // Initialize a thread pool
   private ExecutorService createExecutorService() {
     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
@@ -1073,22 +1131,14 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     LOG.info("Table "+ tableName +" is available!!");
   }
 
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage();
-      return -1;
-    }
-
+  public int run(String dirPath, Map<byte[], List<Path>> map, TableName 
tableName) throws Exception{
     initialize();
     try (Connection connection = ConnectionFactory.createConnection(getConf());
         Admin admin = connection.getAdmin()) {
-      String dirPath = args[0];
-      TableName tableName = TableName.valueOf(args[1]);
 
       boolean tableExists = admin.tableExists(tableName);
       if (!tableExists) {
-        if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, 
"yes"))) {
+        if (dirPath != null && 
"yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
           this.createTable(tableName, dirPath, admin);
         } else {
           String errorMsg = format("Table '%s' does not exist.", tableName);
@@ -1096,19 +1146,37 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
           throw new TableNotFoundException(errorMsg);
         }
       }
-
-      Path hfofDir = new Path(dirPath);
+      Path hfofDir = null;
+      if (dirPath != null) {
+        hfofDir = new Path(dirPath);
+      }
 
       try (Table table = connection.getTable(tableName);
         RegionLocator locator = connection.getRegionLocator(tableName)) {
         boolean silence = 
"yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
-        doBulkLoad(hfofDir, admin, table, locator, silence);
+        if (dirPath != null) {
+          doBulkLoad(hfofDir, admin, table, locator, silence);
+        } else {
+          doBulkLoad(map, admin, table, locator, silence);
+        }
       }
     }
 
     return 0;
   }
 
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage();
+      return -1;
+    }
+
+    String dirPath = args[0];
+    TableName tableName = TableName.valueOf(args[1]);
+    return run(dirPath, null, tableName);
+  }
+
   public static void main(String[] args) throws Exception {
     Configuration conf = HBaseConfiguration.create();
     int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);

http://git-wip-us.apache.org/repos/asf/hbase/blob/348eb283/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 9f2596c..cff8005 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -24,7 +24,10 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
@@ -106,6 +109,15 @@ public class TestLoadIncrementalHFiles {
     util.shutdownMiniCluster();
   }
 
+  @Test(timeout = 120000)
+  public void testSimpleLoadWithMap() throws Exception {
+    runTest("testSimpleLoadWithMap", BloomType.NONE,
+        new byte[][][] {
+          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
+    },  true);
+  }
+
   /**
    * Test case that creates some regions and loads
    * HFiles that fit snugly inside those regions
@@ -250,49 +262,77 @@ public class TestLoadIncrementalHFiles {
   }
 
   private void runTest(String testName, BloomType bloomType,
+      byte[][][] hfileRanges, boolean useMap) throws Exception {
+    runTest(testName, bloomType, null, hfileRanges, useMap);
+  }
+
+  private void runTest(String testName, BloomType bloomType,
       byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
+    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
+  }
+
+  private void runTest(String testName, BloomType bloomType,
+      byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) throws 
Exception {
     final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName);
     final boolean preCreateTable = tableSplitKeys != null;
 
     // Run the test bulkloading the table to the default namespace
     final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
-    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, 
tableSplitKeys, hfileRanges);
+    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, 
tableSplitKeys, hfileRanges,
+        useMap);
 
     // Run the test bulkloading the table to the specified namespace
     final TableName TABLE_WITH_NS = 
TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
-    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, 
tableSplitKeys, hfileRanges);
+    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, 
tableSplitKeys, hfileRanges,
+        useMap);
   }
 
   private void runTest(String testName, TableName tableName, BloomType 
bloomType,
-      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) 
throws Exception {
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, 
boolean useMap)
+          throws Exception {
     HTableDescriptor htd = buildHTD(tableName, bloomType);
-    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, 
hfileRanges);
+    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, 
hfileRanges, useMap);
   }
 
   private void runTest(String testName, HTableDescriptor htd, BloomType 
bloomType,
-      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) 
throws Exception {
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, 
boolean useMap)
+          throws Exception {
     Path dir = util.getDataTestDirOnTestFS(testName);
     FileSystem fs = util.getTestFileSystem();
     dir = dir.makeQualified(fs);
     Path familyDir = new Path(dir, Bytes.toString(FAMILY));
 
     int hfileIdx = 0;
+    Map<byte[], List<Path>> map = null;
+    List<Path> list = null;
+    if (useMap) {
+      map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
+      list = new ArrayList<>();
+      map.put(FAMILY, list);
+    }
     for (byte[][] range : hfileRanges) {
       byte[] from = range[0];
       byte[] to = range[1];
-      HFileTestUtil.createHFile(util.getConfiguration(), fs, new 
Path(familyDir, "hfile_"
-          + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
+      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, 
QUALIFIER, from, to, 1000);
+      if (useMap) {
+        list.add(path);
+      }
     }
     int expectedRows = hfileIdx * 1000;
 
-    if (preCreateTable) {
+    if (preCreateTable || map != null) {
       util.getHBaseAdmin().createTable(htd, tableSplitKeys);
     }
 
     final TableName tableName = htd.getTableName();
     LoadIncrementalHFiles loader = new 
LoadIncrementalHFiles(util.getConfiguration());
     String [] args= {dir.toString(), tableName.toString()};
-    loader.run(args);
+    if (useMap) {
+      loader.run(null, map, tableName);
+    } else {
+      loader.run(args);
+    }
 
     Table table = util.getConnection().getTable(tableName);
     try {
@@ -379,7 +419,7 @@ public class TestLoadIncrementalHFiles {
     htd.addFamily(family);
 
     try {
-      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges);
+      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, 
false);
       assertTrue("Loading into table with non-existent family should have 
failed", false);
     } catch (Exception e) {
       assertTrue("IOException expected", e instanceof IOException);

Reply via email to