PHOENIX-4329 Test IndexScrutinyTool while table is taking writes (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e319ff02 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e319ff02 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e319ff02 Branch: refs/heads/4.x-HBase-1.1 Commit: e319ff02e2d135c526b7334a65bfe1628c0dd220 Parents: 7c21a83 Author: James Taylor <jtay...@salesforce.com> Authored: Sun Oct 29 15:20:23 2017 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Nov 15 10:46:39 2017 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/IndexScrutinyToolIT.java | 101 ++++++++++++++++++- 1 file changed, 96 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e319ff02/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java index 10595a7..cbce7b2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java @@ -36,6 +36,9 @@ import java.util.Properties; import java.util.Random; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Sets; import org.apache.commons.io.IOUtils; @@ -43,6 +46,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; @@ -103,6 +107,7 @@ public class IndexScrutinyToolIT extends BaseTest { private PreparedStatement indexTableUpsertStmt; private long testTime; + private Properties props; @Parameterized.Parameters public static Collection<Object[]> data() { @@ -120,8 +125,11 @@ public class IndexScrutinyToolIT extends BaseTest { @BeforeClass public static void doSetup() throws Exception { - Map<String, String> props = Maps.newHashMap(); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + Map<String, String> serverProps = Maps.newHashMap(); + //disable major compactions + serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0"); + Map<String, String> clientProps = Maps.newHashMap(); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } /** @@ -133,7 +141,7 @@ public class IndexScrutinyToolIT extends BaseTest { createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName)); createTestTable(getUrl(), String.format(indexTableDdl, indexTableName, dataTableFullName)); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props = PropertiesUtil.deepCopy(TEST_PROPERTIES); conn = DriverManager.getConnection(getUrl(), props); String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName); dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert); @@ -141,6 +149,7 @@ public class IndexScrutinyToolIT extends BaseTest { indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert); conn.setAutoCommit(false); testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000; + } @After @@ -177,6 +186,77 @@ public class IndexScrutinyToolIT extends BaseTest { } /** + * Tests running a scrutiny while updates and deletes are happening. + * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue. + */ + @Test + public void testScrutinyWhileTakingWrites() throws Exception { + int id = 0; + while (id < 1000) { + int index = 1; + dataTableUpsertStmt.setInt(index++, id); + dataTableUpsertStmt.setString(index++, "name-" + id); + dataTableUpsertStmt.setInt(index++, id); + dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime)); + dataTableUpsertStmt.executeUpdate(); + id++; + } + conn.commit(); + + //CURRENT_SCN for scrutiny + long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis(); + + // launch background upserts and deletes + final Random random = new Random(0); + Runnable backgroundUpserts = new Runnable() { + @Override + public void run() { + int idToUpsert = random.nextInt(1000); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + PreparedStatement dataPS = + conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName)); + upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000); + conn.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + }; + Runnable backgroundDeletes = new Runnable() { + @Override + public void run() { + int idToDelete = random.nextInt(1000); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String deleteSql = + String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"=" + + idToDelete; + conn.createStatement().executeUpdate(deleteSql); + conn.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + }; + ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); + scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200, + TimeUnit.MILLISECONDS); + scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200, + TimeUnit.MILLISECONDS); + + // scrutiny should report everything as ok + List<Job> completedJobs = + runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName, + scrutinyTS); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); + scheduledThreadPool.shutdown(); + scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS); + } + + /** * Tests an index with the same # of rows as the data table, but one of the index rows is * incorrect Scrutiny should report the invalid rows. */ @@ -570,6 +650,13 @@ public class IndexScrutinyToolIT extends BaseTest { private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize, SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows) { + return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable, + outputInvalidRows, outputFormat, maxOutputRows, Long.MAX_VALUE); + } + + private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize, + SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, + Long maxOutputRows, Long scrutinyTs) { final List<String> args = Lists.newArrayList(); if (schemaName != null) { args.add("-s"); @@ -591,7 +678,7 @@ public class IndexScrutinyToolIT extends BaseTest { args.add(outputDir); } args.add("-t"); - args.add(String.valueOf(Long.MAX_VALUE)); + args.add(String.valueOf(scrutinyTs)); args.add("-run-foreground"); if (batchSize != null) { args.add("-b"); @@ -619,6 +706,10 @@ public class IndexScrutinyToolIT extends BaseTest { return args.toArray(new String[0]); } + private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception { + return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, false, null, null, scrutinyTS)); + } + private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception { return runScrutiny(schemaName, dataTableName, indexTableName, null, null); } @@ -632,7 +723,7 @@ public class IndexScrutinyToolIT extends BaseTest { Long batchSize, SourceTable sourceTable) throws Exception { final String[] cmdArgs = getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, false, - null, null); + null, null, Long.MAX_VALUE); return runScrutiny(cmdArgs); }