Repository: hbase Updated Branches: refs/heads/branch-1 2fd69cf93 -> cc8e1cc39 refs/heads/branch-1.3 673e63337 -> 0b1a4cf39 refs/heads/branch-1.4 defe97edb -> 382d7ad3a
HBASE-20769 getSplits() has a out of bounds problem in TableSnapshotInputFormatImpl Signed-off-by: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cc8e1cc3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cc8e1cc3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cc8e1cc3 Branch: refs/heads/branch-1 Commit: cc8e1cc3931f17c84b18e16de6da4abee3d523ff Parents: 2fd69cf Author: jingyuntian <tianjy1...@gmail.com> Authored: Fri Jun 29 10:49:23 2018 +0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Fri Jun 29 15:25:06 2018 -0700 ---------------------------------------------------------------------- .../mapreduce/TableSnapshotInputFormat.java | 4 ++ .../mapreduce/TableSnapshotInputFormatImpl.java | 15 +++- .../mapreduce/TestTableSnapshotInputFormat.java | 75 +++++++++++++++++++- 3 files changed, 90 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/cc8e1cc3/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index dce311d..07cfc76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -125,6 +125,10 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable public void readFields(DataInput in) throws IOException { delegate.readFields(in); } + + TableSnapshotInputFormatImpl.InputSplit getDelegate() { + return this.delegate; + } } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hbase/blob/cc8e1cc3/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 758b84f..73c5478 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -373,8 +373,19 @@ public class TableSnapshotInputFormatImpl { int len = Math.min(3, hosts.size()); hosts = hosts.subList(0, len); Scan boundedScan = new Scan(scan); - boundedScan.setStartRow(sp[i]); - boundedScan.setStopRow(sp[i + 1]); + if (scan.getStartRow().length == 0) { + boundedScan.withStartRow(sp[i]); + } else { + boundedScan.withStartRow( + Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]); + } + + if (scan.getStopRow().length == 0) { + boundedScan.withStopRow(sp[i + 1]); + } else { + boundedScan.withStopRow( + Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]); + } splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc8e1cc3/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index b1ad8a2..775d29d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -71,6 +71,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa private static final byte[] bbb = Bytes.toBytes("bbb"); private static final byte[] yyy = Bytes.toBytes("yyy"); + private static final byte[] bbc = Bytes.toBytes("bbc"); + private static final byte[] yya = Bytes.toBytes("yya"); @Override protected byte[] getStartRow() { @@ -246,6 +248,60 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa } @Test + public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception { + setupCluster(); + String snapshotName = "testWithMockedMapReduceWithSplitsPerRegion"; + final TableName tableName = TableName.valueOf(snapshotName); + try { + createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); + + Configuration conf = UTIL.getConfiguration(); + Job job = new Job(conf); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + // test scan with startRow and stopRow + Scan scan = new Scan(bbc, yya); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, + tmpTableDir, new RegionSplitter.UniformSplit(), 5); + + verifyWithMockedMapReduce(job, 10, 40, bbc, yya); + } finally { + UTIL.getHBaseAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + @Test + public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception { + setupCluster(); + String snapshotName = "testWithMockedMapReduceWithNoStartRowStopRow"; + final TableName tableName = TableName.valueOf(snapshotName); + try { + createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); + + Configuration conf = UTIL.getConfiguration(); + Job job = new Job(conf); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + // test scan without startRow and stopRow + Scan scan2 = new Scan(); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, + tmpTableDir, new RegionSplitter.UniformSplit(), 5); + + verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_START_ROW); + + } finally { + UTIL.getHBaseAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + @Test public void testNoDuplicateResultsWhenSplitting() throws Exception { setupCluster(); TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); @@ -305,13 +361,28 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa Assert.assertEquals(expectedNumSplits, splits.size()); - HBaseTestingUtility.SeenRowTracker rowTracker = - new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, + stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff")); for (int i = 0; i < splits.size(); i++) { // validate input split InputSplit split = splits.get(i); Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; + Scan scan = + TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan()); + if (startRow.length > 0) { + Assert.assertTrue( + Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()), + Bytes.compareTo(startRow, scan.getStartRow()) <= 0); + } + if (stopRow.length > 0) { + Assert.assertTrue( + Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()), + Bytes.compareTo(stopRow, scan.getStopRow()) >= 0); + } + Assert.assertTrue("startRow should < stopRow", + Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0); // validate record reader TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);