HBASE-20305 adding options to skip deletes/puts on target when running SyncTable

Signed-off-by: tedyu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f574fd47
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f574fd47
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f574fd47

Branch: refs/heads/HBASE-19064
Commit: f574fd478211f795aa4a3365ae2ba2d368fea54d
Parents: 8bc7234
Author: wellington <wchevre...@cloudera.com>
Authored: Wed Mar 28 22:12:01 2018 +0100
Committer: tedyu <yuzhih...@gmail.com>
Committed: Wed Apr 4 12:38:18 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/mapreduce/SyncTable.java       |  36 ++-
 .../hadoop/hbase/mapreduce/TestSyncTable.java   | 248 ++++++++++++++++++-
 2 files changed, 272 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f574fd47/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
index 9b4625b..32b7561 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -63,7 +63,9 @@ public class SyncTable extends Configured implements Tool {
   static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
   static final String SOURCE_ZK_CLUSTER_CONF_KEY = 
"sync.table.source.zk.cluster";
   static final String TARGET_ZK_CLUSTER_CONF_KEY = 
"sync.table.target.zk.cluster";
-  static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
+  static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
+  static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
+  static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
 
   Path sourceHashDir;
   String sourceTableName;
@@ -72,6 +74,8 @@ public class SyncTable extends Configured implements Tool {
   String sourceZkCluster;
   String targetZkCluster;
   boolean dryRun;
+  boolean doDeletes = true;
+  boolean doPuts = true;
 
   Counters counters;
 
@@ -128,6 +132,8 @@ public class SyncTable extends Configured implements Tool {
       jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
     }
     jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
+    jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
+    jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts);
 
     TableMapReduceUtil.initTableMapperJob(targetTableName, 
tableHash.initScan(),
         SyncMapper.class, null, null, job);
@@ -162,6 +168,8 @@ public class SyncTable extends Configured implements Tool {
     Table sourceTable;
     Table targetTable;
     boolean dryRun;
+    boolean doDeletes = true;
+    boolean doPuts = true;
 
     HashTable.TableHash sourceTableHash;
     HashTable.TableHash.Reader sourceHashReader;
@@ -186,7 +194,9 @@ public class SyncTable extends Configured implements Tool {
           TableOutputFormat.OUTPUT_CONF_PREFIX);
       sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
       targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
-      dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
+      dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
+      doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true);
+      doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true);
 
       sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
       LOG.info("Read source hash manifest: " + sourceTableHash);
@@ -473,7 +483,7 @@ public class SyncTable extends Configured implements Tool {
           context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
           matchingRow = false;
 
-          if (!dryRun) {
+          if (!dryRun && doPuts) {
             if (put == null) {
               put = new Put(rowKey);
             }
@@ -488,7 +498,7 @@ public class SyncTable extends Configured implements Tool {
           context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
           matchingRow = false;
 
-          if (!dryRun) {
+          if (!dryRun && doDeletes) {
             if (delete == null) {
               delete = new Delete(rowKey);
             }
@@ -515,7 +525,7 @@ public class SyncTable extends Configured implements Tool {
             context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
             matchingRow = false;
 
-            if (!dryRun) {
+            if (!dryRun && doPuts) {
               // overwrite target cell
               if (put == null) {
                 put = new Put(rowKey);
@@ -696,6 +706,10 @@ public class SyncTable extends Configured implements Tool {
     System.err.println("                  (defaults to cluster in classpath's 
config)");
     System.err.println(" dryrun           if true, output counters but no 
writes");
     System.err.println("                  (defaults to false)");
+    System.err.println(" doDeletes        if false, does not perform deletes");
+    System.err.println("                  (defaults to true)");
+    System.err.println(" doPuts           if false, does not perform puts ");
+    System.err.println("                  (defaults to true)");
     System.err.println();
     System.err.println("Args:");
     System.err.println(" sourcehashdir    path to HashTable output dir for 
source table");
@@ -747,6 +761,18 @@ public class SyncTable extends Configured implements Tool {
           continue;
         }
 
+        final String doDeletesKey = "--doDeletes=";
+        if (cmd.startsWith(doDeletesKey)) {
+          doDeletes = 
Boolean.parseBoolean(cmd.substring(doDeletesKey.length()));
+          continue;
+        }
+
+        final String doPutsKey = "--doPuts=";
+        if (cmd.startsWith(doPutsKey)) {
+          doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length()));
+          continue;
+        }
+
         printUsage("Invalid argument '" + cmd + "'");
         return false;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f574fd47/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
index 543a169..ad02039 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
@@ -74,6 +74,7 @@ public class TestSyncTable {
 
   @AfterClass
   public static void afterClass() throws Exception {
+    TEST_UTIL.cleanupDataTestDirOnTestFS();
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -105,7 +106,52 @@ public class TestSyncTable {
 
     TEST_UTIL.deleteTable(sourceTableName);
     TEST_UTIL.deleteTable(targetTableName);
-    TEST_UTIL.cleanupDataTestDirOnTestFS();
+  }
+
+  @Test
+  public void testSyncTableDoDeletesFalse() throws Exception {
+    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + 
"_source");
+    final TableName targetTableName = TableName.valueOf(name.getMethodName() + 
"_target");
+    Path testDir = 
TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse");
+
+    writeTestData(sourceTableName, targetTableName);
+    hashSourceTable(sourceTableName, testDir);
+    Counters syncCounters = syncTables(sourceTableName, targetTableName,
+        testDir, "--doDeletes=false");
+    assertTargetDoDeletesFalse(100, sourceTableName, targetTableName);
+
+    assertEquals(60, 
syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
+    assertEquals(10, 
syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
+    assertEquals(10, 
syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
+    assertEquals(50, 
syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
+    assertEquals(50, 
syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
+    assertEquals(20, 
syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
+
+    TEST_UTIL.deleteTable(sourceTableName);
+    TEST_UTIL.deleteTable(targetTableName);
+  }
+
+  @Test
+  public void testSyncTableDoPutsFalse() throws Exception {
+    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + 
"_source");
+    final TableName targetTableName = TableName.valueOf(name.getMethodName() + 
"_target");
+    Path testDir = 
TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse");
+
+    writeTestData(sourceTableName, targetTableName);
+    hashSourceTable(sourceTableName, testDir);
+    Counters syncCounters = syncTables(sourceTableName, targetTableName,
+        testDir, "--doPuts=false");
+    assertTargetDoPutsFalse(70, sourceTableName, targetTableName);
+
+    assertEquals(60, 
syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
+    assertEquals(10, 
syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
+    assertEquals(10, 
syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
+    assertEquals(50, 
syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
+    assertEquals(50, 
syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
+    assertEquals(20, 
syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
+
+    TEST_UTIL.deleteTable(sourceTableName);
+    TEST_UTIL.deleteTable(targetTableName);
   }
 
   private void assertEqualTables(int expectedRows, TableName sourceTableName,
@@ -184,14 +230,202 @@ public class TestSyncTable {
     targetTable.close();
   }
 
+  private void assertTargetDoDeletesFalse(int expectedRows, TableName
+      sourceTableName,
+      TableName targetTableName)
+      throws Exception {
+    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
+    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
+
+    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
+    ResultScanner targetScanner = targetTable.getScanner(new Scan());
+    Result targetRow = targetScanner.next();
+    Result sourceRow = sourceScanner.next();
+    int rowsCount = 0;
+    while (targetRow!=null) {
+      rowsCount++;
+      //only compares values for existing rows, skipping rows existing on
+      //target only that were not deleted given --doDeletes=false
+      if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
+        targetRow = targetScanner.next();
+        continue;
+      }
+
+      LOG.debug("SOURCE row: " + (sourceRow == null ? "null"
+          : Bytes.toInt(sourceRow.getRow()))
+          + " cells:" + sourceRow);
+      LOG.debug("TARGET row: " + (targetRow == null ? "null"
+          : Bytes.toInt(targetRow.getRow()))
+          + " cells:" + targetRow);
+
+      Cell[] sourceCells = sourceRow.rawCells();
+      Cell[] targetCells = targetRow.rawCells();
+      int targetRowKey = Bytes.toInt(targetRow.getRow());
+      if (targetRowKey >= 70 && targetRowKey < 80) {
+        if (sourceCells.length == targetCells.length) {
+          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+          LOG.debug("Target cells: " + Arrays.toString(targetCells));
+          Assert.fail("Row " + targetRowKey + " should have more cells in "
+              + "target than in source");
+        }
+
+      } else {
+        if (sourceCells.length != targetCells.length) {
+          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+          LOG.debug("Target cells: " + Arrays.toString(targetCells));
+          Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
+              + " has " + sourceCells.length
+              + " cells in source table but " + targetCells.length
+              + " cells in target table");
+        }
+      }
+      for (int j = 0; j < sourceCells.length; j++) {
+        Cell sourceCell = sourceCells[j];
+        Cell targetCell = targetCells[j];
+        try {
+          if (!CellUtil.matchingRow(sourceCell, targetCell)) {
+            Assert.fail("Rows don't match");
+          }
+          if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
+            Assert.fail("Families don't match");
+          }
+          if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
+            Assert.fail("Qualifiers don't match");
+          }
+          if(targetRowKey < 80 && targetRowKey >= 90){
+            if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
+              Assert.fail("Timestamps don't match");
+            }
+          }
+          if (!CellUtil.matchingValue(sourceCell, targetCell)) {
+            Assert.fail("Values don't match");
+          }
+        } catch (Throwable t) {
+          LOG.debug("Source cell: " + sourceCell + " target cell: "
+              + targetCell);
+          Throwables.propagate(t);
+        }
+      }
+      targetRow = targetScanner.next();
+      sourceRow = sourceScanner.next();
+    }
+    assertEquals("Target expected rows does not match.",expectedRows,
+        rowsCount);
+    sourceScanner.close();
+    targetScanner.close();
+    sourceTable.close();
+    targetTable.close();
+  }
+
+  private void assertTargetDoPutsFalse(int expectedRows, TableName
+      sourceTableName,
+      TableName targetTableName)
+      throws Exception {
+    Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
+    Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
+
+    ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
+    ResultScanner targetScanner = targetTable.getScanner(new Scan());
+    Result targetRow = targetScanner.next();
+    Result sourceRow = sourceScanner.next();
+    int rowsCount = 0;
+
+    while (targetRow!=null) {
+      //only compares values for existing rows, skipping rows existing on
+      //source only that were not added to target given --doPuts=false
+      if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
+        sourceRow = sourceScanner.next();
+        continue;
+      }
+
+      LOG.debug("SOURCE row: " + (sourceRow == null ?
+          "null" :
+          Bytes.toInt(sourceRow.getRow()))
+          + " cells:" + sourceRow);
+      LOG.debug("TARGET row: " + (targetRow == null ?
+          "null" :
+          Bytes.toInt(targetRow.getRow()))
+          + " cells:" + targetRow);
+
+      LOG.debug("rowsCount: " + rowsCount);
+
+      Cell[] sourceCells = sourceRow.rawCells();
+      Cell[] targetCells = targetRow.rawCells();
+      int targetRowKey = Bytes.toInt(targetRow.getRow());
+      if (targetRowKey >= 40 && targetRowKey < 60) {
+        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+        LOG.debug("Target cells: " + Arrays.toString(targetCells));
+        Assert.fail("There shouldn't exist any rows between 40 and 60, since "
+            + "Puts are disabled and Deletes are enabled.");
+      } else if (targetRowKey >= 60 && targetRowKey < 70) {
+        if (sourceCells.length == targetCells.length) {
+          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+          LOG.debug("Target cells: " + Arrays.toString(targetCells));
+          Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
+              + " shouldn't have same number of cells.");
+        }
+      } else if (targetRowKey >= 80 && targetRowKey < 90) {
+        LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+        LOG.debug("Target cells: " + Arrays.toString(targetCells));
+        Assert.fail("There should be no rows between 80 and 90 on target, as "
+            + "these had different timestamps and should had been deleted.");
+      } else if (targetRowKey >= 90 && targetRowKey < 100) {
+        for (int j = 0; j < sourceCells.length; j++) {
+          Cell sourceCell = sourceCells[j];
+          Cell targetCell = targetCells[j];
+          if (CellUtil.matchingValue(sourceCell, targetCell)) {
+            Assert.fail("Cells values should not match for rows between "
+                + "90 and 100. Target row id: " + (Bytes.toInt(targetRow
+                .getRow())));
+          }
+        }
+      } else {
+        for (int j = 0; j < sourceCells.length; j++) {
+          Cell sourceCell = sourceCells[j];
+          Cell targetCell = targetCells[j];
+          try {
+            if (!CellUtil.matchingRow(sourceCell, targetCell)) {
+              Assert.fail("Rows don't match");
+            }
+            if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
+              Assert.fail("Families don't match");
+            }
+            if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
+              Assert.fail("Qualifiers don't match");
+            }
+            if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
+              Assert.fail("Timestamps don't match");
+            }
+            if (!CellUtil.matchingValue(sourceCell, targetCell)) {
+              Assert.fail("Values don't match");
+            }
+          } catch (Throwable t) {
+            LOG.debug(
+                "Source cell: " + sourceCell + " target cell: " + targetCell);
+            Throwables.propagate(t);
+          }
+        }
+      }
+      rowsCount++;
+      targetRow = targetScanner.next();
+      sourceRow = sourceScanner.next();
+    }
+    assertEquals("Target expected rows does not match.",expectedRows,
+        rowsCount);
+    sourceScanner.close();
+    targetScanner.close();
+    sourceTable.close();
+    targetTable.close();
+  }
+
   private Counters syncTables(TableName sourceTableName, TableName 
targetTableName,
-      Path testDir) throws Exception {
+      Path testDir, String... options) throws Exception {
     SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
-    int code = syncTable.run(new String[] {
-        testDir.toString(),
-        sourceTableName.getNameAsString(),
-        targetTableName.getNameAsString()
-        });
+    String[] args = Arrays.copyOf(options, options.length+3);
+    args[options.length] = testDir.toString();
+    args[options.length+1] = sourceTableName.getNameAsString();
+    args[options.length+2] = targetTableName.getNameAsString();
+    int code = syncTable.run(args);
     assertEquals("sync table job failed", 0, code);
 
     LOG.info("Sync tables completed");

Reply via email to