This is an automated email from the ASF dual-hosted git repository. larsh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new dbc98ac Local indexes get out of sync after changes for global consistent indexes. dbc98ac is described below commit dbc98acd1f09d4d8c360a84f9d126b4e03a73fe0 Author: Lars <la...@apache.org> AuthorDate: Sat Aug 22 10:50:53 2020 -0700 Local indexes get out of sync after changes for global consistent indexes. --- .../apache/phoenix/end2end/index/LocalIndexIT.java | 33 ++++++++++ .../phoenix/hbase/index/IndexRegionObserver.java | 70 ++++++++++++---------- 2 files changed, 71 insertions(+), 32 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java index 724da6e..0965ce1 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java @@ -103,6 +103,39 @@ public class LocalIndexIT extends BaseLocalIndexIT { } @Test + public void testLocalIndexConsistency() throws Exception { + if (isNamespaceMapped) { + return; + } + String tableName = schemaName + "." + generateUniqueName(); + String indexName = "IDX_" + generateUniqueName(); + + Connection conn = getConnection(); + conn.setAutoCommit(true); + + conn.createStatement().execute("CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, v1 FLOAT) SPLIT ON (2000)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(rand() * 4000, rand())"); + + ResultSet rs; + for (int i=0; i<15; i++) { + conn.createStatement().execute("UPSERT INTO " + tableName + " SELECT rand() * 4000, rand() FROM " + tableName); + + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName); + rs.next(); + int indexCount = rs.getInt(1); + rs.close(); + + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + tableName); + rs.next(); + int tableCount = rs.getInt(1); + rs.close(); + + assertEquals(indexCount, tableCount); + } + } + + @Test public void testUseUncoveredLocalIndexWithPrefix() throws Exception { String tableName = schemaName + "." + generateUniqueName(); String indexName = "IDX_" + generateUniqueName(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index bfeadcb..2d0cf51 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -702,7 +702,7 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { * unverified status. In phase 2, data table mutations are applied. In phase 3, the status for an index table row is * either set to "verified" or the row is deleted. */ - private void preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c, + private boolean preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context, Collection<? extends Mutation> pendingMutations, @@ -716,13 +716,6 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { current = NullSpan.INSTANCE; } current.addTimelineAnnotation("Built index updates, doing preStep"); - // Handle local index updates - for (IndexMaintainer indexMaintainer : maintainers) { - if (indexMaintainer.isLocalIndex()) { - handleLocalIndexUpdates(c, miniBatchOp, pendingMutations, indexMetaData); - break; - } - } // The rest of this method is for handling global index updates context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create(); prepareIndexMutations(context, maintainers, now); @@ -730,6 +723,9 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); int updateCount = 0; for (IndexMaintainer indexMaintainer : maintainers) { + if (indexMaintainer.isLocalIndex()) { + continue; + } updateCount++; byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); @@ -751,6 +747,7 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { } } TracingUtils.addAnnotation(current, "index update count", updateCount); + return updateCount != 0; } } @@ -796,11 +793,22 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { return true; } - private void preparePostIndexMutations(BatchMutateContext context, long now, PhoenixIndexMetaData indexMetaData, - String tableName) + private void preparePostIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp, + BatchMutateContext context, + Collection<? extends Mutation> pendingMutations, + long now, + PhoenixIndexMetaData indexMetaData) throws Throwable { context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers(); + // Handle local index updates + for (IndexMaintainer indexMaintainer : maintainers) { + if (indexMaintainer.isLocalIndex()) { + handleLocalIndexUpdates(c, miniBatchOp, pendingMutations, indexMetaData); + break; + } + } // Check if we need to skip post index update for any of the rows for (IndexMaintainer indexMaintainer : maintainers) { byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); @@ -837,6 +845,7 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { rowLock.release(); } context.rowLocks.clear(); + String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); throw new IOException("One of the concurrent mutations does not have all indexed columns. " + "The batch needs to be retried " + tableName); } @@ -927,30 +936,27 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { return; } long start = EnvironmentEdgeManager.currentTimeMillis(); - preparePreIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData); + boolean hasGlobalIndex = preparePreIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData); metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start); - // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to - // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates - // can be prepared in less than one millisecond - if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) { - Thread.sleep(1); - LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString()); - } - // Release the locks before making RPC calls for index updates - for (RowLock rowLock : context.rowLocks) { - rowLock.release(); - } - // Do the first phase index updates - doPre(c, context, miniBatchOp); - // Acquire the locks again before letting the region proceed with data table updates - List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size()); - for (RowLock rowLock : context.rowLocks) { - rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration)); + if (hasGlobalIndex) { + // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to + // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates + // can be prepared in less than one millisecond + if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) { + Thread.sleep(1); + LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString()); + } + // Release the locks before making RPC calls for index updates + for (RowLock rowLock : context.rowLocks) { + rowLock.release(); + } + // Do the first phase index updates + doPre(c, context, miniBatchOp); + // Acquire the locks again before letting the region proceed with data table updates + context.rowLocks.clear(); + lockRows(context); } - context.rowLocks.clear(); - context.rowLocks = rowLocks; - preparePostIndexMutations(context, now, indexMetaData, - c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString()); + preparePostIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData); if (failDataTableUpdatesForTesting) { throw new DoNotRetryIOException("Simulating the data table write failure"); }