PHOENIX-4220 Upper bound not being used in partial index rebuilder
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5733c0ec Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5733c0ec Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5733c0ec Branch: refs/heads/4.x-HBase-1.1 Commit: 5733c0ec1332fcc59deec5ccc79219fa5503fd44 Parents: 83fdf70 Author: James Taylor <jtay...@salesforce.com> Authored: Thu Sep 21 09:48:13 2017 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Thu Sep 21 11:07:23 2017 -0700 ---------------------------------------------------------------------- .../end2end/index/PartialIndexRebuilderIT.java | 64 +++++++++++++++++++- .../coprocessor/MetaDataRegionObserver.java | 12 ++-- 2 files changed, 66 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/5733c0ec/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 12630f4..dfe5a28 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end.index; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.sql.Connection; @@ -45,7 +46,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PTable; @@ -71,12 +71,13 @@ import com.google.common.collect.Maps; public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { private static final Random RAND = new Random(5); private static final int WAIT_AFTER_DISABLED = 5000; + private static final int REBUILD_INTERVAL = 2000; @BeforeClass public static void doSetup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString()); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "2000"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, Long.toString(REBUILD_INTERVAL)); serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "120000"); // give up rebuilding after 2 minutes serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED)); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS); @@ -641,7 +642,64 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { private final static CountDownLatch WAIT_FOR_REBUILD_TO_START = new CountDownLatch(1); private final static CountDownLatch WAIT_FOR_INDEX_WRITE = new CountDownLatch(1); - + @Test + public void testUpperBoundSetOnRebuild() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + PTableKey key = new PTableKey(null,fullTableName); + final MyClock clock = new MyClock(1000); + EnvironmentEdgeManager.injectEdge(clock); + try (Connection conn = DriverManager.getConnection(getUrl())) { + PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + clock.time += 100; + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); + clock.time += 100; + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE); + clock.time += 100; + long disableTime = clock.currentTime(); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a', '0')"); + conn.commit(); + // Set clock forward in time past the "overlap" amount we wait for index maintenance to kick in + clock.time += 10 * WAIT_AFTER_DISABLED; + assertTrue(hasDisabledIndex(metaCache, key)); + assertEquals(1,TestUtil.getRowCount(conn, fullTableName)); + assertEquals(0,TestUtil.getRowCount(conn, fullIndexName)); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('bb','bb','11')"); + conn.commit(); + assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); + assertEquals(0,TestUtil.getRowCount(conn, fullIndexName)); + // Set clock back in time and start rebuild + clock.time = disableTime + 100; + IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); + advanceClockUntilPartialRebuildStarts(fullIndexName, clock); + clock.time += REBUILD_INTERVAL; + waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE, clock, REBUILD_INTERVAL); + assertEquals(2,TestUtil.getRowCount(conn, fullTableName)); + // If an upper bound was set on the rebuilder, we should only have found one row + assertEquals(1,TestUtil.getRowCount(conn, fullIndexName)); + } finally { + EnvironmentEdgeManager.injectEdge(null); + } + } + + private static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState, MyClock clock, long increment) throws InterruptedException, SQLException { + int maxTries = 60, nTries = 0; + do { + Thread.sleep(1000); // sleep 1 sec + clock.time += increment; + if (TestUtil.checkIndexState(conn, fullIndexName, expectedIndexState, 0L)) { + return; + } + } while (++nTries < maxTries); + fail("Ran out of time waiting for index state to become " + expectedIndexState); + } + + @Test public void testDisableIndexDuringRebuild() throws Throwable { String schemaName = generateUniqueName(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5733c0ec/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index e42aca2..4d40f1c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -425,19 +425,17 @@ public class MetaDataRegionObserver extends BaseRegionObserver { continue; } long scanBeginTime = Math.max(0, earliestDisableTimestamp - backwardOverlapDurationMs); + long scanEndTime = Math.min(latestUpperBoundTimestamp, + getTimestampForBatch(scanBeginTime,batchExecutedPerTableMap.get(dataPTable.getName()))); LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild - + " from timestamp=" + scanBeginTime + " until " + latestUpperBoundTimestamp); + + " from timestamp=" + scanBeginTime + " until " + scanEndTime); TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false); // TODO Need to set high timeout PostDDLCompiler compiler = new PostDDLCompiler(conn); - MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null, - HConstants.LATEST_TIMESTAMP); - Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(), - maintainers); + MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null, scanEndTime); + Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(), maintainers); - long scanEndTime = Math.min(latestUpperBoundTimestamp, - getTimestampForBatch(scanBeginTime,batchExecutedPerTableMap.get(dataPTable.getName()))); // We can't allow partial results dataTableScan.setTimeRange(scanBeginTime, scanEndTime); dataTableScan.setCacheBlocks(false);