PHOENIX-4997 Phoenix MR on snapshots can produce duplicate rows
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7f13f87c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7f13f87c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7f13f87c Branch: refs/heads/4.x-cdh5.15 Commit: 7f13f87c52f0abac983041eb66ed302a5f9d9338 Parents: 9bf6fc6 Author: Karan Mehta <karanmeht...@gmail.com> Authored: Fri Nov 2 00:15:26 2018 +0000 Committer: pboado <pedro.bo...@gmail.com> Committed: Sun Nov 25 22:09:33 2018 +0000 ---------------------------------------------------------------------- .../end2end/TableSnapshotReadsMapReduceIT.java | 122 +++++++++++-------- .../iterate/MapReduceParallelScanGrouper.java | 32 ++++- .../iterate/TableSnapshotResultIterator.java | 28 +++-- .../java/org/apache/phoenix/query/BaseTest.java | 14 +-- 4 files changed, 122 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f13f87c/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java index cae91a3..e35e159 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java @@ -36,6 +36,7 @@ import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -49,12 +50,18 @@ import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { + + private static final Logger logger = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class); + private final static String SNAPSHOT_NAME = "FOO"; private static final String FIELD1 = "FIELD1"; private static final String FIELD2 = "FIELD2"; @@ -66,6 +73,9 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private static List<List<Object>> result; private long timestamp; private String tableName; + private Job job; + private Path tmpDir; + private Configuration conf; @BeforeClass public static void doSetup() throws Exception { @@ -73,8 +83,8 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } - @Test - public void testMapReduceSnapshots() throws Exception { + @Before + public void before() throws SQLException, IOException { // create table Connection conn = DriverManager.getConnection(getUrl()); tableName = generateUniqueName(); @@ -82,58 +92,43 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getRandomDir(); + conf = getUtility().getConfiguration(); + job = Job.getInstance(conf); + tmpDir = getUtility().getRandomDir(); + } - PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, null, FIELD1, FIELD2, FIELD3); + @Test + public void testMapReduceSnapshots() throws Exception { + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class, + SNAPSHOT_NAME, tableName, tmpDir, null, FIELD1, FIELD2, FIELD3); + configureJob(job, tableName, null, null, false); + } - // configure and test job - configureJob(job, tableName, null, null); + @Test + public void testMapReduceSnapshotsMultiRegion() throws Exception { + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class, + SNAPSHOT_NAME, tableName, tmpDir, null, FIELD1, FIELD2, FIELD3); + configureJob(job, tableName, null, null, true); } @Test public void testMapReduceSnapshotsWithCondition() throws Exception { - // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); - - // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getRandomDir(); - PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3); - - // configure and test job - configureJob(job, tableName, null, "FIELD3 > 0001"); - + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class, + SNAPSHOT_NAME, tableName, tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3); + configureJob(job, tableName, null, "FIELD3 > 0001", false); } @Test public void testMapReduceSnapshotWithLimit() throws Exception { - // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); - - // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getRandomDir(); - // Running limit with order by on non pk column String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1"; - PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,inputQuery); - - // configure and test job - configureJob(job, tableName, inputQuery, null); + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class, + SNAPSHOT_NAME, tableName, tmpDir, inputQuery); + configureJob(job, tableName, inputQuery, null, false); } - private void configureJob(Job job, String tableName, String inputQuery, String condition) throws Exception { + private void configureJob(Job job, String tableName, String inputQuery, String condition, boolean shouldSplit) throws Exception { try { - upsertAndSnapshot(tableName); + upsertAndSnapshot(tableName, shouldSplit); result = new ArrayList<>(); job.setMapperClass(TableSnapshotMapper.class); @@ -151,6 +146,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { if (condition != null) { selectQuery.append(" WHERE " + condition); } + if (inputQuery == null) inputQuery = selectQuery.toString(); @@ -176,12 +172,13 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private void upsertData(String tableName) throws SQLException { Connection conn = DriverManager.getConnection(getUrl()); PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); - upsertData(stmt, "CCCC", "SSDD", 0001); - upsertData(stmt, "CCCC", "HDHG", 0005); - upsertData(stmt, "BBBB", "JSHJ", 0002); - upsertData(stmt, "AAAA", "JHHD", 0003); + upsertData(stmt, "AAAA", "JHHD", 37); + upsertData(stmt, "BBBB", "JSHJ", 224); + upsertData(stmt, "CCCC", "SSDD", 15); + upsertData(stmt, "PPPP", "AJDG", 53); + upsertData(stmt, "SSSS", "HSDG", 59); + upsertData(stmt, "XXXX", "HDPP", 22); conn.commit(); - timestamp = System.currentTimeMillis(); } private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException { @@ -191,31 +188,52 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { stmt.execute(); } - public void upsertAndSnapshot(String tableName) throws Exception { + private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws Exception { upsertData(tableName); + TableName hbaseTableName = TableName.valueOf(tableName); Connection conn = DriverManager.getConnection(getUrl()); HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName)); - // call flush to create new files in the region - admin.flush(tableName); + + if (shouldSplit) { + splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2); + } + + admin.snapshot(SNAPSHOT_NAME, hbaseTableName); List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots(); Assert.assertEquals(tableName, snapshots.get(0).getTable()); + // Capture the snapshot timestamp to use as SCN while reading the table later + // Assigning the timestamp value here will make tests less flaky + timestamp = System.currentTimeMillis(); + // upsert data after snapshot PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); - upsertData(stmt, "DDDD", "SNFB", 0004); + upsertData(stmt, "DDDD", "SNFB", 45); conn.commit(); } - public void deleteSnapshot(String tableName) throws Exception { + private void splitTableSync(HBaseAdmin admin, TableName hbaseTableName, + byte[] splitPoint , int expectedRegions) throws IOException, InterruptedException { + admin.split(hbaseTableName, splitPoint); + for (int i = 0; i < 100; i++) { + List<HRegionInfo> hRegionInfoList = admin.getTableRegions(hbaseTableName); + if (hRegionInfoList.size() >= expectedRegions) { + break; + } + logger.info("Sleeping for 1000 ms while waiting for " + hbaseTableName.getNameAsString() + " to split"); + Thread.sleep(1000); + } + } + + private void deleteSnapshot(String tableName) throws Exception { try (Connection conn = DriverManager.getConnection(getUrl()); HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) { admin.deleteSnapshot(SNAPSHOT_NAME); } - } + } public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f13f87c/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java index 593608f..b4f81ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java @@ -18,6 +18,7 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; +import java.util.Collections; import java.util.List; import com.google.common.base.Preconditions; @@ -45,7 +46,7 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper { private static final MapReduceParallelScanGrouper INSTANCE = new MapReduceParallelScanGrouper(); - public static MapReduceParallelScanGrouper getInstance() { + public static MapReduceParallelScanGrouper getInstance() { return INSTANCE; } @@ -79,18 +80,39 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper { } } + /** + * Get list of region locations from SnapshotManifest + * BaseResultIterators assume that regions are sorted using RegionInfo.COMPARATOR + */ private List<HRegionLocation> getRegionLocationsFromManifest(SnapshotManifest manifest) { List<SnapshotProtos.SnapshotRegionManifest> regionManifests = manifest.getRegionManifests(); Preconditions.checkNotNull(regionManifests); - List<HRegionLocation> regionLocations = Lists.newArrayListWithCapacity(regionManifests.size()); + List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size()); + List<HRegionLocation> hRegionLocations = Lists.newArrayListWithCapacity(regionManifests.size()); for (SnapshotProtos.SnapshotRegionManifest regionManifest : regionManifests) { - regionLocations.add(new HRegionLocation( - HRegionInfo.convert(regionManifest.getRegionInfo()), null)); + HRegionInfo regionInfo = HRegionInfo.convert(regionManifest.getRegionInfo()); + if (isValidRegion(regionInfo)) { + regionInfos.add(regionInfo); + } + } + + Collections.sort(regionInfos); + + for (HRegionInfo regionInfo : regionInfos) { + hRegionLocations.add(new HRegionLocation(regionInfo, null)); } - return regionLocations; + return hRegionLocations; + } + + // Exclude offline split parent regions + private boolean isValidRegion(HRegionInfo hri) { + if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { + return false; + } + return true; } private String getSnapshotName(Configuration conf) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f13f87c/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java index 016d3be..c3d75f7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java @@ -37,7 +37,6 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -79,22 +78,31 @@ public class TableSnapshotResultIterator implements ResultIterator { RestoreSnapshotHelper.RestoreMetaChanges meta = RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, this.fs, this.rootDir, this.restoreDir, this.snapshotName); - List restoredRegions = meta.getRegionsToAdd(); + List<HRegionInfo> restoredRegions = meta.getRegionsToAdd(); this.htd = meta.getTableDescriptor(); - this.regions = new ArrayList(restoredRegions.size()); - Iterator i$ = restoredRegions.iterator(); - - while(i$.hasNext()) { - HRegionInfo hri = (HRegionInfo)i$.next(); - if(CellUtil.overlappingKeys(this.scan.getStartRow(), this.scan.getStopRow(), - hri.getStartKey(), hri.getEndKey())) { - this.regions.add(hri); + this.regions = new ArrayList<>(restoredRegions.size()); + + for (HRegionInfo restoredRegion : restoredRegions) { + if (isValidRegion(restoredRegion)) { + this.regions.add(restoredRegion); } } Collections.sort(this.regions); } + /** + * Exclude offline split parent regions and + * regions that don't intersect with provided scan + */ + private boolean isValidRegion(HRegionInfo hri) { + if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { + return false; + } + return CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), + hri.getStartKey(), hri.getEndKey()); + } + public boolean initSnapshotScanner() throws SQLException { if (closed) { return true; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f13f87c/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index e6e3936..42fa085 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -1782,15 +1782,15 @@ public abstract class BaseTest { /** - * Split SYSTEM.CATALOG at the given split point + * Synchronously split table at the given split point */ - protected static void splitRegion(byte[] splitPoint) throws SQLException, IOException, InterruptedException { - HBaseAdmin admin = + protected static void splitRegion(TableName fullTableName, byte[] splitPoint) throws SQLException, IOException, InterruptedException { + HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); - admin.split(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME, splitPoint); + admin.split(fullTableName, splitPoint); // make sure the split finishes (there's no synchronous splitting before HBase 2.x) - admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME); - admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME); + admin.disableTable(fullTableName); + admin.enableTable(fullTableName); } /** @@ -1819,7 +1819,7 @@ public abstract class BaseTest { AssignmentManager am = master.getAssignmentManager(); // No need to split on the first splitPoint since the end key of region boundaries are exclusive for (int i=1; i<splitPoints.size(); ++i) { - splitRegion(splitPoints.get(i)); + splitRegion(fullTableName, splitPoints.get(i)); } HashMap<ServerName, List<HRegionInfo>> serverToRegionsList = Maps.newHashMapWithExpectedSize(NUM_SLAVES_BASE); Deque<ServerName> availableRegionServers = new ArrayDeque<ServerName>(NUM_SLAVES_BASE);