This is an automated email from the ASF dual-hosted git repository. elserj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push: new 86bc640 HBASE-26273 Force ReadType.STREAM when the user does not explicitly set a ReadType on the Scan for a Snapshot-based Job 86bc640 is described below commit 86bc640c17c71f16926251ba0bc4326caf8bbed0 Author: Josh Elser <els...@apache.org> AuthorDate: Fri Sep 10 16:24:13 2021 -0400 HBASE-26273 Force ReadType.STREAM when the user does not explicitly set a ReadType on the Scan for a Snapshot-based Job HBase 2 moved over Scans to use PREAD by default instead of STREAM like HBase 1. In the context of a MapReduce job, we can generally expect that clients using the InputFormat (batch job) would be reading most of the data for a job. Cater to them, but still give users who want PREAD the ability to do so. Signed-off-by: Duo Zhang <zhang...@apache.org> Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org> --- .../mapreduce/TableSnapshotInputFormatImpl.java | 18 ++++++++++++ .../mapreduce/TestTableSnapshotInputFormat.java | 33 ++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 22c19be..c467a3c 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -129,6 +130,14 @@ public class TableSnapshotInputFormatImpl { public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true; /** + * The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot, + * default STREAM. + */ + public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE = + "hbase.TableSnapshotInputFormat.scanner.readtype"; + public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM; + + /** * Implementation class for InputSplit logic common between mapred and mapreduce. */ public static class InputSplit implements Writable { @@ -382,6 +391,15 @@ public class TableSnapshotInputFormatImpl { } else { throw new IllegalArgumentException("Unable to create scan"); } + + if (scan.getReadType() == ReadType.DEFAULT) { + LOG.info("Provided Scan has DEFAULT ReadType," + + " updating STREAM for Snapshot-based InputFormat"); + // Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case. + scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, + SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT)); + } + return scan; } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 34e6b27..f4e9f7d 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNA import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TestTableSnapshotScanner; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -407,6 +410,36 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa } } + @Test + public void testScannerReadTypeConfiguration() throws IOException { + Configuration conf = new Configuration(false); + // Explicitly set ReadTypes should persist + for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) { + Scan scanWithReadType = new Scan(); + scanWithReadType.setReadType(readType); + assertEquals(scanWithReadType.getReadType(), + serializeAndReturn(conf, scanWithReadType).getReadType()); + } + // We should only see the DEFAULT ReadType getting updated to STREAM. + Scan scanWithoutReadType = new Scan(); + assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); + assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType()); + + // We should still be able to force a certain ReadType when DEFAULT is given. + conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD); + assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); + assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType()); + } + + /** + * Serializes and deserializes the given scan in the same manner that + * TableSnapshotInputFormat does. + */ + private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException { + conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s)); + return TableSnapshotInputFormatImpl.extractScanFromConf(conf); + } + private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {