This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch PHOENIX-5748-4.x-HBase-1.5 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-5748-4.x-HBase-1.5 by this push: new 36d4c7e PHOENIX-5709 Base changes for simplifying index update generation code for consistent global indexes 36d4c7e is described below commit 36d4c7ef92f24625a52360d405176d35855fb6e3 Author: Kadir <kozde...@salesforce.com> AuthorDate: Wed Feb 26 10:19:48 2020 -0800 PHOENIX-5709 Base changes for simplifying index update generation code for consistent global indexes --- .../end2end/ConcurrentMutationsExtendedIT.java | 35 +- .../phoenix/end2end/ConcurrentMutationsIT.java | 3 +- .../org/apache/phoenix/end2end/IndexToolIT.java | 15 +- .../end2end/index/GlobalIndexCheckerIT.java | 70 ++ .../phoenix/compile/ServerBuildIndexCompiler.java | 15 +- .../coprocessor/IndexRebuildRegionScanner.java | 921 +++++++++++++++------ .../UngroupedAggregateRegionObserver.java | 17 +- .../phoenix/hbase/index/IndexRegionObserver.java | 855 ++++++++++++------- .../org/apache/phoenix/index/IndexMaintainer.java | 2 +- 9 files changed, 1350 insertions(+), 583 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java index 571961d..546c1fe 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Bytes; + import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.util.*; import org.junit.Ignore; import org.junit.Test; @@ -51,6 +53,21 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { private final Object lock = new Object(); + private long verifyIndexTable(String tableName, String indexName, Connection conn) throws Exception { + // This checks the state of every raw index row without rebuilding any row + IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null, + 0, IndexTool.IndexVerifyType.ONLY); + // This checks the state of an index row after it is repaired + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); + // We want to check the index rows again as they may be modified by the read repair + IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null, + 0, IndexTool.IndexVerifyType.ONLY); + // Now we rebuild the entire index table and expect that it is still good after the full rebuild + IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null, + 0, IndexTool.IndexVerifyType.AFTER); + return actualRowCount; + } + @Test public void testSynchronousDeletesAndUpsertValues() throws Exception { final String tableName = generateUniqueName(); @@ -130,7 +147,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { t2.start(); doneSignal.await(60, TimeUnit.SECONDS); - IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); + verifyIndexTable(tableName, indexName, conn); } @Test @@ -191,7 +208,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { t2.start(); doneSignal.await(60, TimeUnit.SECONDS); - IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); + verifyIndexTable(tableName, indexName, conn); } @Test @Repeat(5) @@ -204,9 +221,10 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { final String indexName = generateUniqueName(); Connection conn = DriverManager.getConnection(getUrl()); conn.createStatement().execute("CREATE TABLE " + tableName - + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1"); + + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER," + + "CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1"); TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE(v2, v3)"); final CountDownLatch doneSignal = new CountDownLatch(nThreads); Runnable[] runnables = new Runnable[nThreads]; for (int i = 0; i < nThreads; i++) { @@ -216,11 +234,12 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { try { Connection conn = DriverManager.getConnection(getUrl()); for (int i = 0; i < 10000; i++) { - boolean isNull = RAND.nextBoolean(); - int randInt = RAND.nextInt() % nIndexValues; conn.createStatement().execute( "UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, " - + (isNull ? null : randInt) + ")"); + + (RAND.nextBoolean() ? null : (RAND.nextInt() % nIndexValues)) + ", " + + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", " + + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", " + + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")"); if ((i % batchSize) == 0) { conn.commit(); } @@ -241,7 +260,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { } assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS)); - long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); + long actualRowCount = verifyIndexTable(tableName, indexName, conn); assertEquals(nRows, actualRowCount); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java index f312df0..6562af0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java @@ -274,7 +274,8 @@ public class ConcurrentMutationsIT extends ParallelStatsDisabledIT { EnvironmentEdgeManager.injectEdge(null); } } - + @Ignore ("It is not possible to assign the same timestamp two separately committed mutations in the current model\n" + + " except when the server time goes backward. In that case, the behavior is not deterministic") @Test public void testDeleteRowAndUpsertValueAtSameTS1() throws Exception { try { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index d3ca67d..30fd018 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -229,8 +229,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(NROWS, actualRowCount); - // Check after compaction - TestUtil.doMajorCompaction(conn, dataTableFullName); actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(NROWS, actualRowCount); setEveryNthRowWithNull(NROWS, 5, stmt); @@ -241,7 +239,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(NROWS, actualRowCount); - TestUtil.doMajorCompaction(conn, dataTableFullName); actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(NROWS, actualRowCount); indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, @@ -526,7 +523,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { null, 0, IndexTool.IndexVerifyType.AFTER); assertEquals(1, MutationCountingRegionObserver.getMutationCount()); MutationCountingRegionObserver.setMutationCount(0); - // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should + // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not // write any index rows runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE); @@ -614,16 +611,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { // Run the index tool to populate the index while verifying rows runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.AFTER); - // Corrupt one cell by writing directly into the index table - conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix', 1, 'B')"); - conn.commit(); - // Run the index tool using the only-verify option to detect this mismatch between the data and index table runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, - null, -1, IndexTool.IndexVerifyType.ONLY); - cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName); - expectedValueBytes = Bytes.toBytes("Not matching value for 0:0:CODE E:A A:B"); - assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), - expectedValueBytes, 0, expectedValueBytes.length) == 0); + null, 0, IndexTool.IndexVerifyType.ONLY); dropIndexToolTables(conn); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index f9c50fd..35489d0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -117,6 +117,76 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { } @Test + public void testDelete() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + String dml = "DELETE from " + dataTableName + " WHERE id = 'a'"; + assertEquals(1, conn.createStatement().executeUpdate(dml)); + conn.commit(); + String indexTableName = generateUniqueName(); + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); + if (async) { + // run the index MR job. + IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName); + } + // Count the number of index rows + String query = "SELECT COUNT(*) from " + indexTableName; + // There should be one row in the index table + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + // Add rows and check everything is still okay + verifyTableHealth(conn, dataTableName, indexTableName); + } + } + + @Test + public void testSimulateConcurrentUpdates() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + String indexTableName = generateUniqueName(); + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); + if (async) { + // run the index MR job. + IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName); + } + // For the concurrent updates on the same row, the last write phase is ignored. + // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update phase) where the + // verify flag is set to true and/or index rows are deleted and check that this does not impact the + // correctness. + IndexRegionObserver.setFailPostIndexUpdatesForTesting(true); + // Do multiple updates on the same data row + conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')"); + conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('a', 'aa')"); + conn.commit(); + // The expected state of the index table is {('aa', 'a', 'abcc', 'abcd'), ('bc', 'b', 'bcd', 'bcde')} + // Do more multiple updates on the same data row + conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', null, null)"); + conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('a', 'ab')"); + conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('b', 'ab')"); + conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('b', 'ab', null)"); + conn.commit(); + // Now the expected state of the index table is {('ab', 'a', 'abcc' , null), ('ab', 'b', null, 'bcde')} + ResultSet rs = conn.createStatement().executeQuery("SELECT * from " + indexTableName); + assertTrue(rs.next()); + assertEquals("ab", rs.getString(1)); + assertEquals("a", rs.getString(2)); + assertEquals("abcc", rs.getString(3)); + assertEquals(null, rs.getString(4)); + assertTrue(rs.next()); + assertEquals("ab", rs.getString(1)); + assertEquals("b", rs.getString(2)); + assertEquals(null, rs.getString(3)); + assertEquals("bcde", rs.getString(4)); + assertFalse(rs.next()); + } + } + + @Test public void testFailPostIndexDeleteUpdate() throws Exception { String dataTableName = generateUniqueName(); populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java index 4392e23..99caa6e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java @@ -17,27 +17,30 @@ */ package org.apache.phoenix.compile; -import java.sql.SQLException; -import java.util.Collections; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.schema.*; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; +import java.sql.SQLException; +import java.util.Collections; + import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan; @@ -96,9 +99,9 @@ public class ServerBuildIndexCompiler { throw new IllegalArgumentException( "ServerBuildIndexCompiler does not support global indexes on transactional tables"); } + IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection); // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*). // However, in this case, we need to project all of the data columns that contribute to the index. - IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection); for (ColumnReference columnRef : indexMaintainer.getAllColumns()) { if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { scan.addFamily(columnRef.getFamily()); @@ -121,6 +124,8 @@ public class ServerBuildIndexCompiler { scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr)); scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); + scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES); + BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable); addEmptyColumnToScan(scan, indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier()); } if (dataTable.isTransactional()) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 598eb5f..a52bddf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -17,8 +17,8 @@ */ package org.apache.phoenix.coprocessor; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES; import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; -import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn; import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY; import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; @@ -42,13 +42,15 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; +import java.util.NavigableSet; import java.util.concurrent.ExecutionException; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -70,6 +72,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.filter.SkipScanFilter; @@ -85,6 +88,7 @@ import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.index.GlobalIndexChecker; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; @@ -93,6 +97,7 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.ServerUtil; import org.slf4j.Logger; @@ -100,7 +105,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; public class IndexRebuildRegionScanner extends BaseRegionScanner { @@ -309,20 +313,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) { if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) { return false; - } - if (verifyType == IndexTool.IndexVerifyType.ONLY) { - if (before.validIndexRowCount + before.expiredIndexRowCount != scannedDataRowCount) { + } else if (verifyType == IndexTool.IndexVerifyType.ONLY) { + if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) { return true; } - } - if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) { + } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) { if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) { return true; } - if (before.validIndexRowCount + before.expiredIndexRowCount + - after.expiredIndexRowCount + after.validIndexRowCount != scannedDataRowCount) { - return true; - } } return false; } @@ -360,8 +358,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { private Table resultHTable = null; private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE; private boolean verify = false; - private Map<byte[], Put> indexKeyToDataPutMap; - private Map<byte[], Put> dataKeyToDataPutMap; + private Map<byte[], List<Mutation>> indexKeyToMutationMap; + private Map<byte[], Pair<Put, Delete>> dataKeyToMutationMap; private TaskRunner pool; private TaskBatch<Boolean> tasks; private String exceptionMessage; @@ -371,15 +369,21 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { private int indexTableTTL; private VerificationResult verificationResult; private boolean isBeforeRebuilt = true; - - IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan, - final RegionCoprocessorEnvironment env, - UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException { + private boolean partialRebuild = false; + private int singleRowRebuildReturnCode; + private Map<byte[], NavigableSet<byte[]>> familyMap; + private byte[][] viewConstants; + + IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan, + final RegionCoprocessorEnvironment env, + UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException { super(innerScanner); final Configuration config = env.getConfiguration(); if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) { pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS, QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS); + } else { + partialRebuild = true; } maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize); @@ -392,31 +396,38 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { useProto = false; indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); } - if (!scan.isRaw()) { - // No need to deserialize index maintainers when the scan is raw. Raw scan is used by partial rebuilds - List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true); - indexMaintainer = maintainers.get(0); - } + List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true); + indexMaintainer = maintainers.get(0); this.scan = scan; + familyMap = scan.getFamilyMap(); + if (familyMap.isEmpty()) { + familyMap = null; + } + this.innerScanner = innerScanner; this.region = region; this.env = env; this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver; indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY); + if (indexRowKey != null) { + setReturnCodeForSingleRowRebuild(); + pageSizeInRows = 1; + } byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE); if (valueBytes != null) { verificationResult = new VerificationResult(); verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes); if (verifyType != IndexTool.IndexVerifyType.NONE) { verify = true; + viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); // Create the following objects only for rebuilds by IndexTool hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION); indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive(); outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES)); resultHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.RESULT_TABLE_NAME_BYTES)); - indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor( new ThreadPoolBuilder("IndexVerify", env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY, @@ -428,13 +439,38 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } } + private void setReturnCodeForSingleRowRebuild() throws IOException { + try (RegionScanner scanner = region.getScanner(scan)) { + List<Cell> row = new ArrayList<>(); + scanner.next(row); + // Check if the data table row we have just scanned matches with the index row key. + // If not, there is no need to build the index row from this data table row, + // and just return zero row count. + if (row.isEmpty()) { + singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue(); + } else { + Put put = new Put(CellUtil.cloneRow(row.get(0))); + for (Cell cell : row) { + put.add(cell); + } + if (checkIndexRow(indexRowKey, put)) { + singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue(); + } else { + singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue(); + } + } + } + } + @Override public HRegionInfo getRegionInfo() { return region.getRegionInfo(); } @Override - public boolean isFilterDone() { return false; } + public boolean isFilterDone() { + return false; + } private void logToIndexToolResultTable() throws IOException { long scanMaxTs = scan.getTimeRange().getMax(); @@ -505,38 +541,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { m.setDurability(Durability.SKIP_WAL); } - private Delete generateDeleteMarkers(Put put) { - Set<ColumnReference> allColumns = indexMaintainer.getAllColumns(); - int cellCount = put.size(); - if (cellCount == allColumns.size() + 1) { - // We have all the columns for the index table. So, no delete marker is needed - return null; - } - Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(cellCount); - long ts = 0; - for (List<Cell> cells : put.getFamilyCellMap().values()) { - if (cells == null) { - break; - } - for (Cell cell : cells) { - includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell))); - if (ts < cell.getTimestamp()) { - ts = cell.getTimestamp(); - } - } - } - Delete del = null; - for (ColumnReference column : allColumns) { - if (!includedColumns.contains(column)) { - if (del == null) { - del = new Delete(put.getRow()); - } - del.addColumns(column.getFamily(), column.getQualifier(), ts); - } - } - return del; - } - private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException { if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) { ungroupedAggregateRegionObserver.checkForRegionClosing(); @@ -547,12 +551,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return uuidValue; } - private class SimpleValueGetter implements ValueGetter { + public static class SimpleValueGetter implements ValueGetter { final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); final Put put; - SimpleValueGetter (final Put put) { + + public SimpleValueGetter(final Put put) { this.put = put; } + @Override public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier()); @@ -595,7 +601,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, - String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException { + String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException { final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:"); final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:"); final int PREFIX_LENGTH = 3; @@ -643,8 +649,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { length += PREFIX_LENGTH; Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length); - } - else { + } else { errorMessageBytes = Bytes.toBytes(errorMsg); } put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes); @@ -656,21 +661,11 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { outputHTable.put(put); } - private long getMaxTimestamp(Result result) { + private static long getMaxTimestamp(Mutation m) { long ts = 0; - for (Cell cell : result.rawCells()) { - if (ts < cell.getTimestamp()) { - ts = cell.getTimestamp(); - } - } - return ts; - } - - private long getMaxTimestamp(Put put) { - long ts = 0; - for (List<Cell> cells : put.getFamilyCellMap().values()) { + for (List<Cell> cells : m.getFamilyCellMap().values()) { if (cells == null) { - break; + continue; } for (Cell cell : cells) { if (ts < cell.getTimestamp()) { @@ -681,132 +676,383 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return ts; } - private boolean verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException { - ValueGetter valueGetter = new SimpleValueGetter(dataRow); - long ts = getMaxTimestamp(dataRow); - Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, - valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null); - if (indexPut == null) { - // This means the data row does not have any covered column values - indexPut = new Put(indexRow.getRow()); + private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) { + List<Cell> cellList = m.getFamilyCellMap().get(family); + if (cellList == null) { + return null; } - else { - // Remove the empty column prepared by Index codec as we need to change its value - removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), - indexMaintainer.getEmptyKeyValueQualifier()); + for (Cell cell : cellList) { + if (CellUtil.matchingQualifier(cell, qualifier)) { + return cell; + } } - // Add the empty column - indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), - indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES); - int cellCount = 0; - long currentTime = EnvironmentEdgeManager.currentTime(); - for (List<Cell> cells : indexPut.getFamilyCellMap().values()) { + return null; + } + + private boolean isMatchingMutation(Mutation expected, Mutation actual, int version) throws IOException { + if (getTimestamp(expected) != getTimestamp(actual)) { + String errorMsg = "Not matching timestamp"; + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); + logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), + errorMsg, null, null); + return false; + } + for (List<Cell> cells : expected.getFamilyCellMap().values()) { if (cells == null) { - break; + continue; } for (Cell expectedCell : cells) { byte[] family = CellUtil.cloneFamily(expectedCell); byte[] qualifier = CellUtil.cloneQualifier(expectedCell); - Cell actualCell = indexRow.getColumnLatestCell(family, qualifier); - if (actualCell == null) { - // Check if cell expired as per the current server's time and data table ttl - // Index table should have the same ttl as the data table, hence we might not - // get a value back from index if it has already expired between our rebuild and - // verify - // TODO: have a metric to update for these cases - if (isTimestampBeforeTTL(currentTime, expectedCell.getTimestamp())) { - continue; - } - String errorMsg = " Missing cell " + Bytes.toString(family) + ":" + - Bytes.toString(qualifier); - logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg); + Cell actualCell = getCell(actual, family, qualifier); + if (actualCell == null || + !CellUtil.matchingType(expectedCell, actualCell)) { + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); + String errorMsg = "Missing cell (in version " + version + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier); + logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg); return false; } - if (actualCell.getTimestamp() < ts) { - // Skip older cells since a Phoenix index row is composed of cells with the same timestamp - continue; - } - // Check all columns if (!CellUtil.matchingValue(actualCell, expectedCell)) { - String errorMsg = "Not matching value for " + Bytes.toString(family) + ":" + - Bytes.toString(qualifier); - logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), + if (Bytes.compareTo(family, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary()) == 0 && + Bytes.compareTo(qualifier, indexMaintainer.getEmptyKeyValueQualifier()) == 0) { + if (Bytes.compareTo(actualCell.getValueArray(), actualCell.getValueOffset(), actualCell.getValueLength(), + UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) { + // We will not flag this as mismatch but will log it + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); + String errorMsg = "Unverified index row (in version " + version + ")"; + logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg); + continue; + } + } + String errorMsg = "Not matching value (in version " + version + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier); + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); + logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell)); return false; - } else if (actualCell.getTimestamp() != ts) { - String errorMsg = "Not matching timestamp for " + Bytes.toString(family) + ":" + - Bytes.toString(qualifier) + " E: " + ts + " A: " + - actualCell.getTimestamp(); - logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), - errorMsg, null, null); + } + } + } + return true; + } + + private boolean isVerified(Put mutation) throws IOException { + List<Cell> cellList = mutation.get(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), + indexMaintainer.getEmptyKeyValueQualifier()); + Cell cell = (cellList != null && !cellList.isEmpty()) ? cellList.get(0) : null; + if (cell == null) { + throw new DoNotRetryIOException("No empty column cell"); + } + if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), + VERIFIED_BYTES, 0, VERIFIED_BYTES.length) == 0) { + return true; + } + return false; + } + + /** + * This is to reorder the mutations in descending order by the tuple of timestamp and mutation type where + * delete comes before put + */ + public static final Comparator<Mutation> MUTATION_TS_DESC_COMPARATOR = new Comparator<Mutation>() { + @Override + public int compare(Mutation o1, Mutation o2) { + long ts1 = getTimestamp(o1); + long ts2 = getTimestamp(o2); + if (ts1 > ts2) { + return -1; + } + if (ts1 < ts2) { + return 1; + } + if (o1 instanceof Delete && o2 instanceof Put) { + return -1; + } + if (o1 instanceof Put && o2 instanceof Delete) { + return 1; + } + return 0; + } + }; + + private boolean isDeleteFamily(Mutation mutation) { + for (List<Cell> cells : mutation.getFamilyCellMap().values()) { + for (Cell cell : cells) { + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamily) { + return true; + } + } + } + return false; + } + + private boolean isDeleteFamilyVersion(Mutation mutation) { + for (List<Cell> cells : mutation.getFamilyCellMap().values()) { + for (Cell cell : cells) { + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamilyVersion) { + return true; + } + } + } + return false; + } + + private static List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException { + Put put = null; + Delete del = null; + for (Cell cell : indexRow.rawCells()) { + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { + if (put == null) { + put = new Put(CellUtil.cloneRow(cell)); + } + put.add(cell); + } else { + if (del == null) { + del = new Delete(CellUtil.cloneRow(cell)); + } + del.addDeleteMarker(cell); + } + } + return getMutationsWithSameTS(put, del); + } + + /** + * indexRow is the set of all cells of all the row version of an index row from the index table. These are actual + * cells. We group these cells based on timestamp and type (put vs delete), and form the actual set of + * index mutations. indexKeyToMutationMap is a map from an index row key to a set of mutations that are generated + * using the rebuild process (i.e., by replaying raw data table mutations). These sets are sets of expected + * index mutations, one set for each index row key. Since not all mutations in the index table have both phase + * (i.e., pre and post data phase) mutations, we cannot compare actual index mutations with expected one by one + * and expect to find them identical. We need to consider concurrent data mutation effects, data table row write + * failures, post index write failures. Thus, we need to allow some expected and actual mutations to be skipped + * during comparing actual mutations to index mutations. + * + * The main idea for the verification algorithm used here is to match every expected verified put with an actual + * put such that these two mutations are the same except that actual mutation can be unverified. + * + * Some background on why we can skip some of the actual unverified puts and delete markers due to concurrent data + * table updates is as follows: + * + * For each data table mutation, two operations are done on the data table. One is to read the existing row state, + * and the second is to write to the data table. The processing of concurrent data mutations are serialized once + * for reading the existing row states, and then serialized again for updating data table. In other words, + * they go through locking twice, i.e., [lock, read, unlock] and [lock, write, unlock]. Because of this two phase + * locking, for a pair of concurrent mutations (for the same row), the same row state can be read from the data + * table. This means the same existing index row can be made unverified twice with different timestamps, one for + * each concurrent mutation. These unverified mutation are then repaired from the data table. Since expected + * mutations are used for rebuild (which is also used by the read repair), skipping these unverified put mutations + * that are not matching with expected mutation are safe as they will go through the same process during + * read repair and will be skipped and eventually cleaned up by the read repair. We can skip the delete markers + * safely too as they are placed to clean up these unverified mutations. When the data table rows are rebuilt, + * the rebuild process generates the delete family markers. The timestamp of delete markers are the timestamp of + * the data table mutation for which the delete marker is added. Thus, the timestamp of these delete markers will be + * higher than the timestamp of index row to be deleted. + */ + private boolean verifySingleIndexRow(Result indexRow, VerificationResult.PhaseResult verificationPhaseResult) + throws IOException { + List<Mutation> expectedMutationList = indexKeyToMutationMap.get(indexRow.getRow()); + if (expectedMutationList == null) { + throw new DoNotRetryIOException("No expected mutation"); + } + List<Mutation> actualMutationList = prepareActualIndexMutations(indexRow); + if (actualMutationList == null || actualMutationList.isEmpty()) { + throw new DoNotRetryIOException("actualMutationList is null or empty"); + } + Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR); + Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR); + long currentTime = EnvironmentEdgeManager.currentTime(); + int actualIndex = 0; + int expectedIndex = 0; + int matchingCount = 0; + int expectedSize = expectedMutationList.size(); + int actualSize = actualMutationList.size(); + Mutation expected = null; + Mutation previousExpected; + Mutation actual; + while (expectedIndex < expectedSize && actualIndex <actualSize) { + previousExpected = expected; + expected = expectedMutationList.get(expectedIndex); + // Check if cell expired as per the current server's time and data table ttl + // Index table should have the same ttl as the data table, hence we might not + // get a value back from index if it has already expired between our rebuild and + // verify + // TODO: have a metric to update for these cases + if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) { + verificationPhaseResult.expiredIndexRowCount++; + return true; + } + actual = actualMutationList.get(actualIndex); + if (expected instanceof Put) { + if (previousExpected == null || previousExpected instanceof Put) { + // This expected put is either the first mutation or a put just comes after another expected mutation + // on the expected mutation list which is sorted by the mutation timestamps. The cell timestamps + // within each mutation here are the same. + // Go down the list of actual mutations and find the corresponding actual put mutation with the same + // timestamp. Stop if a verified put or delete family mutation is encountered on the way. Skip + // unverified puts or delete family version delete markers. + while (getTimestamp(actual) > getTimestamp(expected) && + ((actual instanceof Put && !isVerified((Put) actual)) || + (actual instanceof Delete && isDeleteFamilyVersion(actual)))) { + actualIndex++; + if (actualIndex == actualSize) { + break; + } + actual = actualMutationList.get(actualIndex); + } + } else { // previousExpected instanceof Delete + // Between an expected delete and put, there cannot be any types of mutation even verified put + while (getTimestamp(actual) > getTimestamp(expected)) { + actualIndex++; + if (actualIndex == actualSize) { + break; + } + actual = actualMutationList.get(actualIndex); + } + } + if (actualIndex == actualSize) { + break; + } + // Now the expected and actual mutations should match + if (isMatchingMutation(expected, actual, expectedIndex)) { + expectedIndex++; + actualIndex++; + matchingCount++; + continue; + } + verificationPhaseResult.invalidIndexRowCount++; + return false; + } else { // expected instanceof Delete + // Between put and delete, delete and delete, or before first delete, there can be other deletes and + // unverified puts. Skip all of them if any + while (getTimestamp(actual) > getTimestamp(expected) && + ((actual instanceof Put && !isVerified((Put) actual)) || actual instanceof Delete)) { + actualIndex++; + if (actualIndex == actualSize) { + break; + } + actual = actualMutationList.get(actualIndex); + } + if (actualIndex == actualSize) { + break; + } + // If this is first expected mutation is delete, there should be an actual delete mutation with the + // same timestamp or an unverified put with the same or older timestamp + if (getTimestamp(actual) == getTimestamp(expected) && + (actual instanceof Delete && isDeleteFamily(actual))) { + expectedIndex++; + actualIndex++; + matchingCount++; + continue; + } else if (getTimestamp(actual) <= getTimestamp(expected) && + (actual instanceof Put && !isVerified((Put) actual))) { + expectedIndex++; + if (previousExpected == null) { + matchingCount++; + } + continue; + } + if (previousExpected == null) { + String errorMsg = "First delete check failure"; + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants); + logToIndexToolOutputTable(dataKey, indexRow.getRow(), + getTimestamp(expected), + getTimestamp(actual), errorMsg); + verificationPhaseResult.invalidIndexRowCount++; return false; } - cellCount++; } } - if (cellCount != indexRow.rawCells().length) { - String errorMsg = "Expected to find " + cellCount + " cells but got " - + indexRow.rawCells().length + " cells"; - logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg); - return false; + if ((expectedIndex != expectedSize) || actualIndex != actualSize) { + if (matchingCount > 0) { + // We do not consider this as a verification issue but log it for further information. + // This may happen due to compaction + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants); + String errorMsg = "Expected to find " + expectedMutationList.size() + " mutations but got " + + actualMutationList.size(); + logToIndexToolOutputTable(dataKey, indexRow.getRow(), + getTimestamp(expectedMutationList.get(0)), + getTimestamp(actualMutationList.get(0)), errorMsg); + } else { + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants); + String errorMsg = "Not matching index row"; + logToIndexToolOutputTable(dataKey, indexRow.getRow(), + getTimestamp(expectedMutationList.get(0)), + getTimestamp(actualMutationList.get(0)), errorMsg); + verificationPhaseResult.invalidIndexRowCount++; + return false; + } } + verificationPhaseResult.validIndexRowCount++; return true; } - private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap, + private static long getMaxTimestamp(Pair<Put, Delete> pair) { + Put put = pair.getFirst(); + long ts1 = 0; + if (put != null) { + ts1 = getMaxTimestamp((Mutation)put); + } + Delete del = pair.getSecond(); + long ts2 = 0; + if (del != null) { + ts1 = getMaxTimestamp((Mutation)del); + } + return (ts1 > ts2) ? ts1 : ts2; + } + + private void verifyIndexRows(List<KeyRange> keys, VerificationResult.PhaseResult verificationPhaseResult) throws IOException { - int expectedRowCount = keys.size(); + List<KeyRange> invalidKeys = new ArrayList<>(); ScanRanges scanRanges = ScanRanges.createPointLookup(keys); Scan indexScan = new Scan(); indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); scanRanges.initializeScan(indexScan); SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); - indexScan.setFilter(skipScanFilter); - int rowCount = 0; + indexScan.setFilter(new SkipScanFilter(skipScanFilter, true)); + indexScan.setRaw(true); + indexScan.setMaxVersions(); try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) { for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) { - Put dataPut = indexKeyToDataPutMap.get(result.getRow()); - if (dataPut == null) { - // This should never happen - String errorMsg = "Missing data row"; - logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg); - exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName(); - throw new IOException(exceptionMessage); + KeyRange keyRange = PVarbinary.INSTANCE.getKeyRange(result.getRow()); + if (!keys.contains(keyRange)) { + continue; } - if (verifySingleIndexRow(result, dataPut)) { - verificationPhaseResult.validIndexRowCount++; - perTaskDataKeyToDataPutMap.remove(dataPut.getRow()); - } else { - verificationPhaseResult.invalidIndexRowCount++; + if (!verifySingleIndexRow(result, verificationPhaseResult)) { + invalidKeys.add(keyRange); } - rowCount++; + keys.remove(keyRange); } } catch (Throwable t) { ServerUtil.throwIOException(indexHTable.getName().toString(), t); } // Check if any expected rows from index(which we didn't get) are already expired due to TTL // TODO: metrics for expired rows - if (!perTaskDataKeyToDataPutMap.isEmpty()) { - Iterator<Entry<byte[], Put>> itr = perTaskDataKeyToDataPutMap.entrySet().iterator(); + if (!keys.isEmpty()) { + Iterator<KeyRange> itr = keys.iterator(); long currentTime = EnvironmentEdgeManager.currentTime(); while(itr.hasNext()) { - Entry<byte[], Put> entry = itr.next(); - long ts = getMaxTimestamp(entry.getValue()); - if (isTimestampBeforeTTL(currentTime, ts)) { + KeyRange keyRange = itr.next(); + byte[] key = keyRange.getLowerRange(); + List<Mutation> mutationList = indexKeyToMutationMap.get(key); + if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) { itr.remove(); - rowCount++; verificationPhaseResult.expiredIndexRowCount++; } } } - if (rowCount != expectedRowCount) { - for (Map.Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) { + if (keys.size() > 0) { + for (KeyRange keyRange : keys) { String errorMsg = "Missing index row"; - logToIndexToolOutputTable(entry.getKey(), null, getMaxTimestamp(entry.getValue()), - 0, errorMsg); + byte[] key = keyRange.getLowerRange(); + List<Mutation> mutationList = indexKeyToMutationMap.get(key); + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants); + logToIndexToolOutputTable(dataKey, + keyRange.getLowerRange(), + getMaxTimestamp(dataKeyToMutationMap.get(dataKey)), + getTimestamp(mutationList.get(mutationList.size() - 1)), errorMsg); } - verificationPhaseResult.missingIndexRowCount += expectedRowCount - rowCount; + verificationPhaseResult.missingIndexRowCount += keys.size(); } + keys.addAll(invalidKeys); } private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) { @@ -816,7 +1062,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return tsToCheck < (currentTime - (long) indexTableTTL * 1000); } - private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap, + private void addVerifyTask(final List<KeyRange> keys, final VerificationResult.PhaseResult verificationPhaseResult) { tasks.add(new Task<Boolean>() { @Override @@ -826,7 +1072,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName(); throw new IOException(exceptionMessage); } - verifyIndexRows(keys, perTaskDataKeyToDataPutMap, verificationPhaseResult); + verifyIndexRows(keys, verificationPhaseResult); } catch (Exception e) { throw e; } @@ -836,32 +1082,26 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } private void parallelizeIndexVerify(VerificationResult.PhaseResult verificationPhaseResult) throws IOException { - for (Mutation mutation : mutations) { - indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation); - } - int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / rowCountPerTask; + int taskCount = (indexKeyToMutationMap.size() + rowCountPerTask - 1) / rowCountPerTask; tasks = new TaskBatch<>(taskCount); - List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount); + List<List<KeyRange>> listOfKeyRangeList = new ArrayList<>(taskCount); List<VerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount); List<KeyRange> keys = new ArrayList<>(rowCountPerTask); - Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - dataPutMapList.add(perTaskDataKeyToDataPutMap); + listOfKeyRangeList.add(keys); VerificationResult.PhaseResult perTaskVerificationPhaseResult = new VerificationResult.PhaseResult(); verificationPhaseResultList.add(perTaskVerificationPhaseResult); - for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) { - keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey())); - perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue()); + for (byte[] indexKey: indexKeyToMutationMap.keySet()) { + keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey)); if (keys.size() == rowCountPerTask) { - addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult); + addVerifyTask(keys, perTaskVerificationPhaseResult); keys = new ArrayList<>(rowCountPerTask); - perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - dataPutMapList.add(perTaskDataKeyToDataPutMap); + listOfKeyRangeList.add(keys); perTaskVerificationPhaseResult = new VerificationResult.PhaseResult(); verificationPhaseResultList.add(perTaskVerificationPhaseResult); } } if (keys.size() > 0) { - addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult); + addVerifyTask(keys, perTaskVerificationPhaseResult); } List<Boolean> taskResultList = null; try { @@ -878,30 +1118,41 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { throw new IOException(exceptionMessage); } } - if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) { - for (Map<byte[], Put> dataPutMap : dataPutMapList) { - dataKeyToDataPutMap.putAll(dataPutMap); - } - } for (VerificationResult.PhaseResult result : verificationPhaseResultList) { verificationPhaseResult.add(result); } + if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) { + Map<byte[], Pair<Put, Delete>> newDataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + for (List<KeyRange> keyRangeList : listOfKeyRangeList) { + for (KeyRange keyRange : keyRangeList) { + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants); + newDataKeyToMutationMap.put(dataKey, dataKeyToMutationMap.get(dataKey)); + } + } + dataKeyToMutationMap.clear(); + dataKeyToMutationMap = newDataKeyToMutationMap; + } } private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException { byte[] uuidValue = ServerCacheClient.generateId(); UngroupedAggregateRegionObserver.MutationList currentMutationList = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize); + Put put = null; for (Mutation mutation : mutationList) { - Put put = (Put) mutation; - currentMutationList.add(mutation); - setMutationAttributes(put, uuidValue); - uuidValue = commitIfReady(uuidValue, currentMutationList); - Delete deleteMarkers = generateDeleteMarkers(put); - if (deleteMarkers != null) { - setMutationAttributes(deleteMarkers, uuidValue); - currentMutationList.add(deleteMarkers); + if (mutation instanceof Put) { + if (put != null) { + // back to back put, i.e., no delete in between. we can commit the previous put + uuidValue = commitIfReady(uuidValue, currentMutationList); + } + currentMutationList.add(mutation); + setMutationAttributes(mutation, uuidValue); + put = (Put)mutation; + } else { + currentMutationList.add(mutation); + setMutationAttributes(mutation, uuidValue); uuidValue = commitIfReady(uuidValue, currentMutationList); + put = null; } } if (!currentMutationList.isEmpty()) { @@ -912,11 +1163,11 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { private void verifyAndOrRebuildIndex() throws IOException { VerificationResult nextVerificationResult = new VerificationResult(); - nextVerificationResult.scannedDataRowCount = mutations.size(); + nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size(); if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) { // For these options we start with rebuilding index rows rebuildIndexRows(mutations); - nextVerificationResult.rebuiltIndexRowCount = mutations.size(); + nextVerificationResult.rebuiltIndexRowCount = dataKeyToMutationMap.size(); isBeforeRebuilt = false; } if (verifyType == IndexTool.IndexVerifyType.NONE) { @@ -928,73 +1179,285 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { // For these options we start with verifying index rows parallelizeIndexVerify(verificationPhaseResult); nextVerificationResult.before.add(verificationPhaseResult); - if (mutations.size() != verificationPhaseResult.getTotalCount()) { - throw new DoNotRetryIOException( - "mutations.size() != verificationPhaseResult.getTotalCount() at the before phase " + - nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size()); - } } if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) { // For these options, we have identified the rows to be rebuilt and now need to rebuild them // At this point, dataKeyToDataPutMap includes mapping only for the rows to be rebuilt mutations.clear(); - for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet()) { - mutations.add(entry.getValue()); + + for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) { + if (entry.getValue().getFirst() != null) { + mutations.add(entry.getValue().getFirst()); + } + if (entry.getValue().getSecond() != null) { + mutations.add(entry.getValue().getSecond()); + } } rebuildIndexRows(mutations); - nextVerificationResult.rebuiltIndexRowCount += mutations.size(); + nextVerificationResult.rebuiltIndexRowCount += dataKeyToMutationMap.size(); isBeforeRebuilt = false; } if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) { // We have rebuilt index row and now we need to verify them - indexKeyToDataPutMap.clear(); VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult(); + indexKeyToMutationMap.clear(); + for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) { + prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond()); + } parallelizeIndexVerify(verificationPhaseResult); nextVerificationResult.after.add(verificationPhaseResult); - if (mutations.size() != verificationPhaseResult.getTotalCount()) { - throw new DoNotRetryIOException( - "mutations.size() != verificationPhaseResult.getTotalCount() at the after phase " + - nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size()); - } } - indexKeyToDataPutMap.clear(); verificationResult.add(nextVerificationResult); } + private boolean isColumnIncluded(Cell cell) { + byte[] family = CellUtil.cloneFamily(cell); + if (!familyMap.containsKey(family)) { + return false; + } + NavigableSet<byte[]> set = familyMap.get(family); + if (set == null || set.isEmpty()) { + return true; + } + byte[] qualifier = CellUtil.cloneQualifier(cell); + return set.contains(qualifier); + } + + public static long getTimestamp(Mutation m) { + for (List<Cell> cells : m.getFamilyCellMap().values()) { + for (Cell cell : cells) { + return cell.getTimestamp(); + } + } + throw new IllegalStateException("No cell found"); + } + + /** + * This is to reorder the mutations in ascending order by the tuple of timestamp and mutation type where + * delete comes before put + */ + public static final Comparator<Mutation> MUTATION_TS_COMPARATOR = new Comparator<Mutation>() { + @Override + public int compare(Mutation o1, Mutation o2) { + long ts1 = getTimestamp(o1); + long ts2 = getTimestamp(o2); + if (ts1 < ts2) { + return -1; + } + if (ts1 > ts2) { + return 1; + } + if (o1 instanceof Delete && o2 instanceof Put) { + return -1; + } + if (o1 instanceof Put && o2 instanceof Delete) { + return 1; + } + return 0; + } + }; + + public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) { + List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(2); + if (put != null) { + mutationList.add(put); + } + if (del != null) { + mutationList.add(del); + } + // Group the cells within a mutation based on their timestamps and create a separate mutation for each group + mutationList = (List<Mutation>) IndexManagementUtil.flattenMutationsByTimestamp(mutationList); + // Reorder the mutations on the same row so that delete comes before put when they have the same timestamp + Collections.sort(mutationList, MUTATION_TS_COMPARATOR); + return mutationList; + } + + private static Put prepareIndexPutForRebuid(IndexMaintainer indexMaintainer, ImmutableBytesPtr rowKeyPtr, + ValueGetter mergedRowVG, long ts) + throws IOException { + Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, + mergedRowVG, rowKeyPtr, ts, null, null); + if (indexPut == null) { + // No covered column. Just prepare an index row with the empty column + byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, rowKeyPtr, + null, null, HConstants.LATEST_TIMESTAMP); + indexPut = new Put(indexRowKey); + } + indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), + indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES); + return indexPut; + } + + public static void removeColumn(Put put, Cell deleteCell) { + byte[] family = CellUtil.cloneFamily(deleteCell); + List<Cell> cellList = put.getFamilyCellMap().get(family); + if (cellList == null) { + return; + } + Iterator<Cell> cellIterator = cellList.iterator(); + while (cellIterator.hasNext()) { + Cell cell = cellIterator.next(); + if (CellUtil.matchingQualifier(cell, deleteCell)) { + cellIterator.remove(); + if (cellList.isEmpty()) { + put.getFamilyCellMap().remove(family); + } + return; + } + } + } + + public static void apply(Put destination, Put source) throws IOException { + for (List<Cell> cells : source.getFamilyCellMap().values()) { + for (Cell cell : cells) { + if (!destination.has(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell))) { + destination.add(cell); + } + } + } + } + + public static Put applyNew(Put destination, Put source) throws IOException { + Put next = new Put(destination); + apply(next, source); + return next; + } + + /** + * Generate the index update for a data row from the mutation that are obtained by merging the previous data row + * state with the pending row mutation for index rebuild. This method is called only for global indexes. + * pendingMutations is a sorted list of data table mutations that are used to replay index table mutations. + * This list is sorted in ascending order by the tuple of row key, timestamp and mutation type where delete comes + * after put. + */ + public static List<Mutation> prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer, + Put dataPut, Delete dataDel) throws IOException { + List<Mutation> dataMutations = getMutationsWithSameTS(dataPut, dataDel); + List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size()); + // The row key ptr of the data table row for which we will build index rows here + ImmutableBytesPtr rowKeyPtr = (dataPut != null) ? new ImmutableBytesPtr(dataPut.getRow()) : + new ImmutableBytesPtr(dataDel.getRow()); + // Start with empty data table row + Put currentDataRow = null; + // The index row key corresponding to the current data row + byte[] indexRowKeyForCurrentDataRow = null; + for (Mutation mutation : dataMutations) { + long ts = getTimestamp(mutation); + if (mutation instanceof Put) { + // Add this put on top of the current data row state to get the next data row state + Put nextDataRow = (currentDataRow == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRow); + ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow); + Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts); + indexMutations.add(indexPut); + // Delete the current index row if the new index key is different than the current one + if (indexRowKeyForCurrentDataRow != null) { + if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) { + Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow, + IndexMaintainer.DeleteType.ALL_VERSIONS, ts); + indexMutations.add(del); + } + } + // For the next iteration + currentDataRow = nextDataRow; + indexRowKeyForCurrentDataRow = indexPut.getRow(); + } else if (currentDataRow != null) { + // We apply delete column mutations only on the current data row state. For the index table, + // we are only interested in if the current data row is deleted or not. There is no need to apply + // column deletes to index rows since index rows are always full rows and all the cells in an index + // row have the same timestamp value. Because of this index rows versions do not share cells. + for (List<Cell> cells : mutation.getFamilyCellMap().values()) { + for (Cell cell : cells) { + switch ((KeyValue.Type.codeToType(cell.getTypeByte()))) { + case DeleteFamily: + case DeleteFamilyVersion: + currentDataRow.getFamilyCellMap().remove(CellUtil.cloneFamily(cell)); + break; + case DeleteColumn: + case Delete: + removeColumn(currentDataRow, cell); + } + } + } + if (currentDataRow.getFamilyCellMap().size() == 0) { + Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow, + IndexMaintainer.DeleteType.ALL_VERSIONS, ts); + indexMutations.add(del); + currentDataRow = null; + indexRowKeyForCurrentDataRow = null; + } + } + } + return indexMutations; + } + + private void prepareIndexMutations(Put put, Delete del) throws IOException{ + List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del); + for (Mutation mutation : indexMutations) { + byte[] indexRowKey = mutation.getRow(); + List<Mutation> mutationList = indexKeyToMutationMap.get(indexRowKey); + if (mutationList == null) { + mutationList = new ArrayList<>(); + mutationList.add(mutation); + indexKeyToMutationMap.put(indexRowKey, mutationList); + } else { + mutationList.add(mutation); + } + } + } + @Override public boolean next(List<Cell> results) throws IOException { + if (indexRowKey != null && + singleRowRebuildReturnCode == GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue()) { + byte[] rowCountBytes = + PLong.INSTANCE.toBytes(Long.valueOf(singleRowRebuildReturnCode)); + final Cell aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, + SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); + results.add(aggKeyValue); + return false; + } Cell lastCell = null; int rowCount = 0; region.startRegionOperation(); try { - // Partial rebuilds by MetadataRegionObserver use raw scan. Inline verification is not supported for them - boolean partialRebuild = scan.isRaw(); byte[] uuidValue = ServerCacheClient.generateId(); synchronized (innerScanner) { do { List<Cell> row = new ArrayList<Cell>(); hasMore = innerScanner.nextRaw(row); if (!row.isEmpty()) { - lastCell = row.get(0); + lastCell = row.get(0); // lastCell is any cell from the last visited row Put put = null; Delete del = null; for (Cell cell : row) { if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { + if (!partialRebuild && familyMap != null && !isColumnIncluded(cell)) { + continue; + } if (put == null) { put = new Put(CellUtil.cloneRow(cell)); - mutations.add(put); } put.add(cell); } else { if (del == null) { del = new Delete(CellUtil.cloneRow(cell)); - mutations.add(del); } del.addDeleteMarker(cell); } } - if (partialRebuild) { + if (put == null && del == null) { + continue; + } + // Always add the put first and then delete for a given row. This simplifies the logic in + // IndexRegionObserver + if (put != null) { + mutations.add(put); + } + if (del != null) { + mutations.add(del); + } + if (!verify) { if (put != null) { setMutationAttributes(put, uuidValue); } @@ -1002,35 +1465,18 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { setMutationAttributes(del, uuidValue); } uuidValue = commitIfReady(uuidValue, mutations); - } - if (indexRowKey != null) { - if (put != null) { - setMutationAttributes(put, uuidValue); - } - Delete deleteMarkers = generateDeleteMarkers(put); - if (deleteMarkers != null) { - setMutationAttributes(deleteMarkers, uuidValue); - mutations.add(deleteMarkers); - uuidValue = commitIfReady(uuidValue, mutations); - } - // GlobalIndexChecker passed the index row key. This is to build a single index row. - // Check if the data table row we have just scanned matches with the index row key. - // If not, there is no need to build the index row from this data table row, - // and just return zero row count. - if (checkIndexRow(indexRowKey, put)) { - rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue(); - } else { - rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue(); - } - break; + } else { + byte[] dataKey = (put != null) ? put.getRow() : del.getRow(); + prepareIndexMutations(put, del); + dataKeyToMutationMap.put(dataKey, new Pair<Put, Delete>(put, del)); } rowCount++; } } while (hasMore && rowCount < pageSizeInRows); - if (!partialRebuild && indexRowKey == null) { - verifyAndOrRebuildIndex(); - } else { - if (!mutations.isEmpty()) { + if (!mutations.isEmpty()) { + if (verify) { + verifyAndOrRebuildIndex(); + } else { ungroupedAggregateRegionObserver.checkForRegionClosing(); ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize); } @@ -1043,10 +1489,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { region.closeRegionOperation(); mutations.clear(); if (verify) { - indexKeyToDataPutMap.clear(); - dataKeyToDataPutMap.clear(); + dataKeyToMutationMap.clear(); + indexKeyToMutationMap.clear(); } } + if (indexRowKey != null) { + rowCount = singleRowRebuildReturnCode; + } byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); final Cell aggKeyValue; if (lastCell == null) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 4f21511..fa42ce9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -1060,9 +1060,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, final RegionCoprocessorEnvironment env) throws IOException { - - RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, env, this); - return scanner; + if (!scan.isRaw() && scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY) == null) { + Scan rawScan = new Scan(scan); + rawScan.setRaw(true); + rawScan.setMaxVersions(); + rawScan.getFamilyMap().clear(); + rawScan.setFilter(null); + for (byte[] family : scan.getFamilyMap().keySet()) { + rawScan.addFamily(family); + } + innerScanner.close(); + RegionScanner scanner = region.getScanner(rawScan); + return new IndexRebuildRegionScanner(scanner, region, scan, env, this); + } + return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this); } private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 08a120e..0092c5c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,25 +42,31 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment; +import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; +import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.LockManager.RowLock; import org.apache.phoenix.hbase.index.builder.IndexBuildManager; import org.apache.phoenix.hbase.index.builder.IndexBuilder; @@ -67,20 +74,27 @@ import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; +import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexMetaData; +import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.ServerUtil.ConnectionType; -import com.google.common.collect.Lists; +import java.util.Set; + +import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew; +import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.prepareIndexMutationsForRebuild; +import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn; /** * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed @@ -155,14 +169,16 @@ public class IndexRegionObserver extends BaseRegionObserver { private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>(); private boolean rebuild; + // The current and next states of the data rows corresponding to the pending mutations + private HashMap<ImmutableBytesPtr, Pair<Put, Put>> dataRowStates; private BatchMutateContext(int clientVersion) { this.clientVersion = clientVersion; } } - + private ThreadLocal<BatchMutateContext> batchMutateContext = new ThreadLocal<BatchMutateContext>(); - + /** Configuration key for the {@link IndexBuilder} to use */ public static final String INDEX_BUILDER_CONF_KEY = "index.builder"; @@ -172,17 +188,11 @@ public class IndexRegionObserver extends BaseRegionObserver { */ public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion"; - private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy"; - public static final String INDEX_LAZY_POST_BATCH_WRITE = "org.apache.hadoop.hbase.index.lazy.post_batch.write"; private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false; private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold"; private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000; - private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold"; - private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000; - private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold"; - private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000; private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment"; private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000; @@ -343,13 +353,9 @@ public class IndexRegionObserver extends BaseRegionObserver { public static long getMaxTimestamp(Mutation m) { long maxTs = 0; - long ts = 0; - Iterator iterator = m.getFamilyCellMap().entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<byte[], List<Cell>> entry = (Map.Entry) iterator.next(); - Iterator<Cell> cellIterator = entry.getValue().iterator(); - while (cellIterator.hasNext()) { - Cell cell = cellIterator.next(); + long ts; + for (List<Cell> cells : m.getFamilyCellMap().values()) { + for (Cell cell : cells) { ts = cell.getTimestamp(); if (ts > maxTs) { maxTs = ts; @@ -403,316 +409,541 @@ public class IndexRegionObserver extends BaseRegionObserver { } } - private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp, - long now, ReplayWrite replayWrite) throws IOException { - Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>(); - boolean copyMutations = false; - for (int i = 0; i < miniBatchOp.size(); i++) { - if (miniBatchOp.getOperationStatus(i) == IGNORE) { - continue; - } - Mutation m = miniBatchOp.getOperation(i); - if (this.builder.isEnabled(m)) { - // Track whether or not we need to - ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); - if (mutationsMap.containsKey(row)) { - copyMutations = true; - } else { - mutationsMap.put(row, null); - } - } - } - // early exit if it turns out we don't have any edits - if (mutationsMap.isEmpty()) { - return null; - } - // If we're copying the mutations - Collection<Mutation> originalMutations; - Collection<? extends Mutation> mutations; - if (copyMutations) { - originalMutations = null; - mutations = mutationsMap.values(); - } else { - originalMutations = Lists.newArrayListWithExpectedSize(mutationsMap.size()); - mutations = originalMutations; - } - - boolean resetTimeStamp = replayWrite == null; + /** + * This method is only used for local indexes + */ + private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp, + long now) throws IOException { + Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>(); + boolean copyMutations = false; + for (int i = 0; i < miniBatchOp.size(); i++) { + if (miniBatchOp.getOperationStatus(i) == IGNORE) { + continue; + } + Mutation m = miniBatchOp.getOperation(i); + if (this.builder.isEnabled(m)) { + // Track whether or not we need to + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); + if (mutationsMap.containsKey(row)) { + copyMutations = true; + } else { + mutationsMap.put(row, null); + } + } + } + // early exit if it turns out we don't have any edits + if (mutationsMap.isEmpty()) { + return null; + } + // If we're copying the mutations + Collection<Mutation> originalMutations; + Collection<? extends Mutation> mutations; + if (copyMutations) { + originalMutations = null; + mutations = mutationsMap.values(); + } else { + originalMutations = Lists.newArrayListWithExpectedSize(mutationsMap.size()); + mutations = originalMutations; + } + for (int i = 0; i < miniBatchOp.size(); i++) { + Mutation m = miniBatchOp.getOperation(i); + // skip this mutation if we aren't enabling indexing + // unfortunately, we really should ask if the raw mutation (rather than the combined mutation) + // should be indexed, which means we need to expose another method on the builder. Such is the + // way optimization go though. + if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) { + // Unless we're replaying edits to rebuild the index, we update the time stamp + // of the data table to prevent overlapping time stamps (which prevents index + // inconsistencies as this case isn't handled correctly currently). + for (List<Cell> cells : m.getFamilyCellMap().values()) { + for (Cell cell : cells) { + CellUtil.setTimestamp(cell, now); + } + } + // Only copy mutations if we found duplicate rows + // which only occurs when we're partially rebuilding + // the index (since we'll potentially have both a + // Put and a Delete mutation for the same row). + if (copyMutations) { + // Add the mutation to the batch set + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); + MultiMutation stored = mutationsMap.get(row); + // we haven't seen this row before, so add it + if (stored == null) { + stored = new MultiMutation(row); + mutationsMap.put(row, stored); + } + stored.addAll(m); + } else { + originalMutations.add(m); + } + } + } + if (copyMutations) { + mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations); + } + return mutations; + } - for (int i = 0; i < miniBatchOp.size(); i++) { - Mutation m = miniBatchOp.getOperation(i); - // skip this mutation if we aren't enabling indexing - // unfortunately, we really should ask if the raw mutation (rather than the combined mutation) - // should be indexed, which means we need to expose another method on the builder. Such is the - // way optimization go though. - if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) { - if (resetTimeStamp) { - // Unless we're replaying edits to rebuild the index, we update the time stamp - // of the data table to prevent overlapping time stamps (which prevents index - // inconsistencies as this case isn't handled correctly currently). - for (List<Cell> cells : m.getFamilyCellMap().values()) { - for (Cell cell : cells) { - CellUtil.setTimestamp(cell, now); - } - } - } - // No need to write the table mutations when we're rebuilding - // the index as they're already written and just being replayed. - if (replayWrite == ReplayWrite.INDEX_ONLY - || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) { - miniBatchOp.setOperationStatus(i, NOWRITE); - } + public static void setTimestamp(Mutation m, long ts) throws IOException { + for (List<Cell> cells : m.getFamilyCellMap().values()) { + for (Cell cell : cells) { + CellUtil.setTimestamp(cell, ts); + } + } + } - // Only copy mutations if we found duplicate rows - // which only occurs when we're partially rebuilding - // the index (since we'll potentially have both a - // Put and a Delete mutation for the same row). - if (copyMutations) { - // Add the mutation to the batch set - - ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); - MultiMutation stored = mutationsMap.get(row); - // we haven't seen this row before, so add it - if (stored == null) { - stored = new MultiMutation(row); - mutationsMap.put(row, stored); - } - stored.addAll(m); - } else { - originalMutations.add(m); - } - } - } + /** + * This method applies pending delete mutations on the next row states + */ + private void applyPendingDeleteMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp, + BatchMutateContext context) throws IOException { + for (Integer i = 0; i < miniBatchOp.size(); i++) { + if (miniBatchOp.getOperationStatus(i) == IGNORE) { + continue; + } + Mutation m = miniBatchOp.getOperation(i); + if (!this.builder.isEnabled(m)) { + continue; + } + if (!(m instanceof Delete)) { + continue; + } + ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow()); + Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr); + Put nextDataRowState = dataRowState.getSecond(); + if (nextDataRowState == null) { + continue; + } + for (List<Cell> cells : m.getFamilyCellMap().values()) { + for (Cell cell : cells) { + switch (KeyValue.Type.codeToType(cell.getTypeByte())) { + case DeleteFamily: + case DeleteFamilyVersion: + nextDataRowState.getFamilyCellMap().remove(CellUtil.cloneFamily(cell)); + break; + case DeleteColumn: + case Delete: + removeColumn(nextDataRowState, cell); + } + } + } + if (nextDataRowState != null && nextDataRowState.getFamilyCellMap().size() == 0) { + dataRowState.setSecond(null); + } + } + } - if (copyMutations || replayWrite != null) { - mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations); - } - return mutations; - } + /** + * This method applies the pending put mutations on the the next row states. + * Before this method is called, the next row states is set to current row states. + */ + private void applyPendingPutMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp, + BatchMutateContext context, long now) throws IOException { + for (Integer i = 0; i < miniBatchOp.size(); i++) { + if (miniBatchOp.getOperationStatus(i) == IGNORE) { + continue; + } + Mutation m = miniBatchOp.getOperation(i); + // skip this mutation if we aren't enabling indexing + if (!this.builder.isEnabled(m)) { + continue; + } + // Unless we're replaying edits to rebuild the index, we update the time stamp + // of the data table to prevent overlapping time stamps (which prevents index + // inconsistencies as this case isn't handled correctly currently). + setTimestamp(m, now); + if (m instanceof Put) { + ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow()); + Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr); + if (dataRowState == null) { + dataRowState = new Pair<Put, Put>(null, null); + context.dataRowStates.put(rowKeyPtr, dataRowState); + } + Put nextDataRowState = dataRowState.getSecond(); + dataRowState.setSecond((nextDataRowState != null) ? applyNew((Put) m, nextDataRowState) : new Put((Put) m)); + } + } + } - public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) { - List<Cell> cellList = m.getFamilyCellMap().get(emptyCF); - if (cellList == null) { - return; - } - Iterator<Cell> cellIterator = cellList.iterator(); - while (cellIterator.hasNext()) { - Cell cell = cellIterator.next(); - if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - emptyCQ, 0, emptyCQ.length) == 0) { - cellIterator.remove(); - return; - } - } - } + /** + * * Prepares data row current and next row states + */ + private void prepareDataRowStates(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp, + BatchMutateContext context, + long now) throws IOException { + if (context.rowsToLock.size() == 0) { + return; + } + // Retrieve the current row states from the data table + getCurrentRowStates(c, context); + applyPendingPutMutations(miniBatchOp, context, now); + applyPendingDeleteMutations(miniBatchOp, context); + } - private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c, - MiniBatchOperationInProgress<Mutation> miniBatchOp, - ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates) { - byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName(); - HTableInterfaceReference hTableInterfaceReference = - new HTableInterfaceReference(new ImmutableBytesPtr(tableName)); - List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference); - if (localIndexUpdates == null || localIndexUpdates.isEmpty()) { - return; - } - List<Mutation> localUpdates = new ArrayList<Mutation>(); - Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator(); - while (indexUpdatesItr.hasNext()) { - Pair<Mutation, byte[]> next = indexUpdatesItr.next(); - localUpdates.add(next.getFirst()); - } - if (!localUpdates.isEmpty()) { - miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()])); - } - } + public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) { + List<Cell> cellList = m.getFamilyCellMap().get(emptyCF); + if (cellList == null) { + return; + } + Iterator<Cell> cellIterator = cellList.iterator(); + while (cellIterator.hasNext()) { + Cell cell = cellIterator.next(); + if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + emptyCQ, 0, emptyCQ.length) == 0) { + cellIterator.remove(); + return; + } + } + } - private void prepareIndexMutations( - ObserverContext<RegionCoprocessorEnvironment> c, - MiniBatchOperationInProgress<Mutation> miniBatchOp, - BatchMutateContext context, - Collection<? extends Mutation> mutations, - long now, - PhoenixIndexMetaData indexMetaData) throws Throwable { - List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers(); - // get the current span, or just use a null-span to avoid a bunch of if statements - try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { - Span current = scope.getSpan(); - if (current == null) { - current = NullSpan.INSTANCE; - } - // get the index updates for all elements in this batch - context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create(); - this.builder.getIndexUpdates(context.indexUpdates, miniBatchOp, mutations, indexMetaData); - current.addTimelineAnnotation("Built index updates, doing preStep"); - handleLocalIndexUpdates(c, miniBatchOp, context.indexUpdates); - context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); - int updateCount = 0; - for (IndexMaintainer indexMaintainer : maintainers) { - updateCount++; - byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); - byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); - HTableInterfaceReference hTableInterfaceReference = - new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); - List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference); - for (Pair<Mutation, byte[]> update : updates) { - // add the VERIFIED cell, which is the empty cell - Mutation m = update.getFirst(); - if (context.rebuild) { - if (m instanceof Put) { - long ts = getMaxTimestamp(m); - // Remove the empty column prepared by Index codec as we need to change its value - removeEmptyColumn(m, emptyCF, emptyCQ); - ((Put) m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES); - } - context.preIndexUpdates.put(hTableInterfaceReference, m); - } else { - if (m instanceof Put) { - // Remove the empty column prepared by Index codec as we need to change its value - removeEmptyColumn(m, emptyCF, emptyCQ); - // Set the status of the index row to "unverified" - ((Put) m).addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES); - // This will be done before the data table row is updated (i.e., in the first write phase) - context.preIndexUpdates.put(hTableInterfaceReference, m); - } - else { - // Set the status of the index row to "unverified" - Put unverifiedPut = new Put(m.getRow()); - unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES); - // This will be done before the data table row is updated (i.e., in the first write phase) - context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut); - } - } - } - } - TracingUtils.addAnnotation(current, "index update count", updateCount); - } - } + /** + * The index update generation for local indexes uses the existing index update generation code (i.e., + * the {@link IndexBuilder} implementation). + */ + private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp, + Collection<? extends Mutation> pendingMutations, + PhoenixIndexMetaData indexMetaData) throws Throwable { + ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create(); + this.builder.getIndexUpdates(indexUpdates, miniBatchOp, pendingMutations, indexMetaData); + byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName(); + HTableInterfaceReference hTableInterfaceReference = + new HTableInterfaceReference(new ImmutableBytesPtr(tableName)); + List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference); + if (localIndexUpdates == null || localIndexUpdates.isEmpty()) { + return; + } + List<Mutation> localUpdates = new ArrayList<Mutation>(); + Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator(); + while (indexUpdatesItr.hasNext()) { + Pair<Mutation, byte[]> next = indexUpdatesItr.next(); + localUpdates.add(next.getFirst()); + } + if (!localUpdates.isEmpty()) { + miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()])); + } + } + /** + * Retrieve the the last committed data row state. This method is called only for regular data mutations since for + * rebuild (i.e., index replay) mutations include all row versions. + */ + private void getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c, + BatchMutateContext context) throws IOException { + Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size()); + for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) { + keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get())); + } + Scan scan = new Scan(); + ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys)); + scanRanges.initializeScan(scan); + SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); + scan.setFilter(skipScanFilter); + context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size()); + try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) { + boolean more = true; + while(more) { + List<Cell> cells = new ArrayList<Cell>(); + more = scanner.next(cells); + if (cells.isEmpty()) { + continue; + } + byte[] rowKey = CellUtil.cloneRow(cells.get(0)); + Put put = new Put(rowKey); + for (Cell cell : cells) { + put.add(cell); + } + context.dataRowStates.put(new ImmutableBytesPtr(rowKey), new Pair<Put, Put>(put, new Put(put))); + } + } + } - protected PhoenixIndexMetaData getPhoenixIndexMetaData( - ObserverContext<RegionCoprocessorEnvironment> observerContext, - MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp); - if (!(indexMetaData instanceof PhoenixIndexMetaData)) { - throw new DoNotRetryIOException( - "preBatchMutateWithExceptions: indexMetaData is not an instance of "+PhoenixIndexMetaData.class.getName() + - ", current table is:" + - observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString()); - } - return (PhoenixIndexMetaData)indexMetaData; - } + /** + * Generate the index update for a data row from the mutation that are obtained by merging the previous data row + * state with the pending row mutation. + */ + private void prepareIndexMutations(BatchMutateContext context, List<IndexMaintainer> maintainers, long ts) + throws IOException { + List<Pair<IndexMaintainer, HTableInterfaceReference>> indexTables = new ArrayList<>(maintainers.size()); + for (IndexMaintainer indexMaintainer : maintainers) { + if (indexMaintainer.isLocalIndex()) { + continue; + } + HTableInterfaceReference hTableInterfaceReference = + new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); + indexTables.add(new Pair<>(indexMaintainer, hTableInterfaceReference)); + } + for (Map.Entry<ImmutableBytesPtr, Pair<Put, Put>> entry : context.dataRowStates.entrySet()) { + ImmutableBytesPtr rowKeyPtr = entry.getKey(); + Pair<Put, Put> dataRowState = entry.getValue(); + Put currentDataRowState = dataRowState.getFirst(); + Put nextDataRowState = dataRowState.getSecond(); + if (currentDataRowState == null && nextDataRowState == null) { + continue; + } + for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables) { + IndexMaintainer indexMaintainer = pair.getFirst(); + HTableInterfaceReference hTableInterfaceReference = pair.getSecond(); + if (nextDataRowState != null) { + ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState); + Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, + nextDataRowVG, rowKeyPtr, ts, null, null); + if (indexPut == null) { + // No covered column. Just prepare an index row with the empty column + byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr, + null, null, HConstants.LATEST_TIMESTAMP); + indexPut = new Put(indexRowKey); + } + indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), + indexMaintainer.getEmptyKeyValueQualifier(), ts, UNVERIFIED_BYTES); + context.indexUpdates.put(hTableInterfaceReference, + new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get())); + // Delete the current index row if the new index key is different than the current one + if (currentDataRowState != null) { + ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState); + byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr, + null, null, HConstants.LATEST_TIMESTAMP); + if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) { + Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow, + IndexMaintainer.DeleteType.ALL_VERSIONS, ts); + context.indexUpdates.put(hTableInterfaceReference, + new Pair<Mutation, byte[]>(del, rowKeyPtr.get())); + } + } + } else if (currentDataRowState != null) { + ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState); + byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr, + null, null, HConstants.LATEST_TIMESTAMP); + Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow, + IndexMaintainer.DeleteType.ALL_VERSIONS, ts); + context.indexUpdates.put(hTableInterfaceReference, + new Pair<Mutation, byte[]>(del, rowKeyPtr.get())); + } + } + } + } - private void preparePostIndexMutations( - BatchMutateContext context, - long now, - PhoenixIndexMetaData indexMetaData) throws Throwable { - context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); - List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers(); - // Check if we need to skip post index update for any of the rows - for (IndexMaintainer indexMaintainer : maintainers) { - byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); - byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); - HTableInterfaceReference hTableInterfaceReference = - new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); - List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference); - for (Pair<Mutation, byte[]> update : updates) { - // Are there concurrent updates on the data table row? if so, skip post index updates - // and let read repair resolve conflicts - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond()); - PendingRow pendingRow = pendingRows.get(rowKey); - if (!pendingRow.isConcurrent()) { - Mutation m = update.getFirst(); - if (m instanceof Put) { - Put verifiedPut = new Put(m.getRow()); - // Set the status of the index row to "verified" - verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES); - context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut); - } - else { - context.postIndexUpdates.put(hTableInterfaceReference, m); - } + /** + * This method prepares unverified index mutations which are applied to index tables before the data table is + * updated. In the three phase update approach, in phase 1, the status of existing index rows is set to "unverified" + * (these rows will be deleted from the index table in phase 3), and/or new put mutations are added with the + * unverified status. In phase 2, data table mutations are applied. In phase 3, the status for an index table row is + * either set to "verified" or the row is deleted. + */ + private void preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp, + BatchMutateContext context, + Collection<? extends Mutation> pendingMutations, + long now, + PhoenixIndexMetaData indexMetaData) throws Throwable { + List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers(); + // get the current span, or just use a null-span to avoid a bunch of if statements + try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { + Span current = scope.getSpan(); + if (current == null) { + current = NullSpan.INSTANCE; + } + current.addTimelineAnnotation("Built index updates, doing preStep"); + // Handle local index updates + for (IndexMaintainer indexMaintainer : maintainers) { + if (indexMaintainer.isLocalIndex()) { + handleLocalIndexUpdates(c, miniBatchOp, pendingMutations, indexMetaData); + break; + } + } + // The rest of this method is for handling global index updates + context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create(); + prepareIndexMutations(context, maintainers, now); + + context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); + int updateCount = 0; + for (IndexMaintainer indexMaintainer : maintainers) { + updateCount++; + byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); + HTableInterfaceReference hTableInterfaceReference = + new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); + List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference); + for (Pair<Mutation, byte[]> update : updates) { + Mutation m = update.getFirst(); + if (m instanceof Put) { + // This will be done before the data table row is updated (i.e., in the first write phase) + context.preIndexUpdates.put(hTableInterfaceReference, m); + } else { + // Set the status of the index row to "unverified" + Put unverifiedPut = new Put(m.getRow()); + unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES); + // This will be done before the data table row is updated (i.e., in the first write phase) + context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut); + } + } + } + TracingUtils.addAnnotation(current, "index update count", updateCount); + } + } - } - } - } - // We are done with handling concurrent mutations. So we can remove the rows of this batch from - // the collection of pending rows - removePendingRows(context); - context.indexUpdates.clear(); - } + protected PhoenixIndexMetaData getPhoenixIndexMetaData(ObserverContext<RegionCoprocessorEnvironment> observerContext, + MiniBatchOperationInProgress<Mutation> miniBatchOp) + throws IOException { + IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp); + if (!(indexMetaData instanceof PhoenixIndexMetaData)) { + throw new DoNotRetryIOException( + "preBatchMutateWithExceptions: indexMetaData is not an instance of "+PhoenixIndexMetaData.class.getName() + + ", current table is:" + + observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString()); + } + return (PhoenixIndexMetaData)indexMetaData; + } - public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, - MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable { - ignoreAtomicOperations(miniBatchOp); - PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp); - BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion()); - setBatchMutateContext(c, context); - Mutation firstMutation = miniBatchOp.getOperation(0); - ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation); - context.rebuild = replayWrite != null; - /* - * Exclusively lock all rows so we get a consistent read - * while determining the index updates - */ - long now; - if (!context.rebuild) { - populateRowsToLock(miniBatchOp, context); - lockRows(context); - now = EnvironmentEdgeManager.currentTimeMillis(); - // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect - // concurrent updates - populatePendingRows(context); - } - else { - now = EnvironmentEdgeManager.currentTimeMillis(); - } - // First group all the updates for a single row into a single update to be processed - Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite); - // early exit if it turns out we don't have any edits - if (mutations == null) { - return; - } + private void preparePostIndexMutations(BatchMutateContext context, long now, PhoenixIndexMetaData indexMetaData) + throws Throwable { + context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); + List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers(); + // Check if we need to skip post index update for any of the rows + for (IndexMaintainer indexMaintainer : maintainers) { + byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); + HTableInterfaceReference hTableInterfaceReference = + new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); + List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference); + for (Pair<Mutation, byte[]> update : updates) { + // Are there concurrent updates on the data table row? if so, skip post index updates + // and let read repair resolve conflicts + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond()); + PendingRow pendingRow = pendingRows.get(rowKey); + if (!pendingRow.isConcurrent()) { + Mutation m = update.getFirst(); + if (m instanceof Put) { + Put verifiedPut = new Put(m.getRow()); + // Set the status of the index row to "verified" + verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES); + context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut); + } + else { + context.postIndexUpdates.put(hTableInterfaceReference, m); + } + } + } + } + // We are done with handling concurrent mutations. So we can remove the rows of this batch from + // the collection of pending rows + removePendingRows(context); + context.indexUpdates.clear(); + } - long start = EnvironmentEdgeManager.currentTimeMillis(); - prepareIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData); - metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start); - - // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to - // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates - // can be prepared in less than one millisecond - if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) { - Thread.sleep(1); - LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString()); - } - // Release the locks before making RPC calls for index updates - for (RowLock rowLock : context.rowLocks) { - rowLock.release(); - } - // Do the first phase index updates - doPre(c, context, miniBatchOp); - if (!context.rebuild) { - // Acquire the locks again before letting the region proceed with data table updates - List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size()); - for (RowLock rowLock : context.rowLocks) { - rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration)); - } - context.rowLocks.clear(); - context.rowLocks = rowLocks; - preparePostIndexMutations(context, now, indexMetaData); - } - if (failDataTableUpdatesForTesting) { - throw new DoNotRetryIOException("Simulating the data table write failure"); - } - } + /** + * There are at most two rebuild mutation for every row, one put and one delete. They are listed in indexMutations + * next to each other such that put comes before delete by {@link IndexRebuildRegionScanner}. This method is called + * only for global indexes. + */ + private void preBatchMutateWithExceptionsForRebuild(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp, + BatchMutateContext context, + IndexMaintainer indexMaintainer) throws Throwable { + Put put = null; + List <Mutation> indexMutations = new ArrayList<>(); + for (int i = 0; i < miniBatchOp.size(); i++) { + if (miniBatchOp.getOperationStatus(i) == IGNORE) { + continue; + } + Mutation m = miniBatchOp.getOperation(i); + if (!this.builder.isEnabled(m)) { + continue; + } + if (m instanceof Put) { + if (put != null) { + indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null)); + } + put = (Put)m; + } else { + indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, (Delete)m)); + put = null; + } + miniBatchOp.setOperationStatus(i, NOWRITE); + } + if (put != null) { + indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null)); + } + HTableInterfaceReference hTableInterfaceReference = + new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); + context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); + for (Mutation m : indexMutations) { + context.preIndexUpdates.put(hTableInterfaceReference, m); + } + doPre(c, context, miniBatchOp); + // For rebuild updates, no post index update is prepared. Just create an empty list. + context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); + } + public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable { + ignoreAtomicOperations(miniBatchOp); + PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp); + BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion()); + setBatchMutateContext(c, context); + Mutation firstMutation = miniBatchOp.getOperation(0); + ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation); + context.rebuild = replayWrite != null; + if (context.rebuild) { + preBatchMutateWithExceptionsForRebuild(c, miniBatchOp, context, indexMetaData.getIndexMaintainers().get(0)); + return; + } + /* + * Exclusively lock all rows so we get a consistent read + * while determining the index updates + */ + long now; + populateRowsToLock(miniBatchOp, context); + lockRows(context); + now = EnvironmentEdgeManager.currentTimeMillis(); + // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect + // concurrent updates + populatePendingRows(context); + // Prepare current and next data rows states for pending mutations (for global indexes) + prepareDataRowStates(c, miniBatchOp, context, now); + // Group all the updates for a single row into a single update to be processed (for local indexes) + Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now); + // early exit if it turns out we don't have any edits + if (mutations == null || mutations.isEmpty()) { + return; + } + long start = EnvironmentEdgeManager.currentTimeMillis(); + preparePreIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData); + metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start); + // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to + // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates + // can be prepared in less than one millisecond + if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) { + Thread.sleep(1); + LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString()); + } + // Release the locks before making RPC calls for index updates + for (RowLock rowLock : context.rowLocks) { + rowLock.release(); + } + // Do the first phase index updates + doPre(c, context, miniBatchOp); + // Acquire the locks again before letting the region proceed with data table updates + List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size()); + for (RowLock rowLock : context.rowLocks) { + rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration)); + } + context.rowLocks.clear(); + context.rowLocks = rowLocks; + preparePostIndexMutations(context, now, indexMetaData); + if (failDataTableUpdatesForTesting) { + throw new DoNotRetryIOException("Simulating the data table write failure"); + } + } private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) { this.batchMutateContext.set(context); } - + private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) { return this.batchMutateContext.get(); } - + private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) { this.batchMutateContext.remove(); } @@ -822,14 +1053,6 @@ public class IndexRegionObserver extends BaseRegionObserver { } /** - * Exposed for testing! - * @return the currently instantiated index builder - */ - public IndexBuilder getBuilderForTesting() { - return this.builder.getBuilderForTesting(); - } - - /** * Enable indexing on the given table * @param desc {@link HTableDescriptor} for the table on which indexing should be enabled * @param builder class to use when building the index for this table diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index dba165b..3723388 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -1593,7 +1593,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { */ private void initCachedState() { byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst(); - dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier); + dataEmptyKeyValueRef = new ColumnReference(dataEmptyKeyValueCF, emptyKvQualifier); this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size()); // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key) this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());