This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch PHOENIX-5748-4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-5748-4.x-HBase-1.5 by 
this push:
     new 36d4c7e  PHOENIX-5709 Base changes for simplifying index update 
generation code for consistent global indexes
36d4c7e is described below

commit 36d4c7ef92f24625a52360d405176d35855fb6e3
Author: Kadir <kozde...@salesforce.com>
AuthorDate: Wed Feb 26 10:19:48 2020 -0800

    PHOENIX-5709 Base changes for simplifying index update generation code for 
consistent global indexes
---
 .../end2end/ConcurrentMutationsExtendedIT.java     |  35 +-
 .../phoenix/end2end/ConcurrentMutationsIT.java     |   3 +-
 .../org/apache/phoenix/end2end/IndexToolIT.java    |  15 +-
 .../end2end/index/GlobalIndexCheckerIT.java        |  70 ++
 .../phoenix/compile/ServerBuildIndexCompiler.java  |  15 +-
 .../coprocessor/IndexRebuildRegionScanner.java     | 921 +++++++++++++++------
 .../UngroupedAggregateRegionObserver.java          |  17 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   | 855 ++++++++++++-------
 .../org/apache/phoenix/index/IndexMaintainer.java  |   2 +-
 9 files changed, 1350 insertions(+), 583 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 571961d..546c1fe 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -25,7 +25,9 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
+
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.util.*;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -51,6 +53,21 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
 
     private final Object lock = new Object();
 
+    private long verifyIndexTable(String tableName, String indexName, 
Connection conn) throws Exception {
+        // This checks the state of every raw index row without rebuilding any 
row
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.ONLY);
+        // This checks the state of an index row after it is repaired
+        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, 
indexName);
+        // We want to check the index rows again as they may be modified by 
the read repair
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.ONLY);
+        // Now we rebuild the entire index table and expect that it is still 
good after the full rebuild
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.AFTER);
+        return actualRowCount;
+    }
+
     @Test
     public void testSynchronousDeletesAndUpsertValues() throws Exception {
         final String tableName = generateUniqueName();
@@ -130,7 +147,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
         t2.start();
 
         doneSignal.await(60, TimeUnit.SECONDS);
-        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        verifyIndexTable(tableName, indexName, conn);
     }
 
     @Test
@@ -191,7 +208,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
         t2.start();
 
         doneSignal.await(60, TimeUnit.SECONDS);
-        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        verifyIndexTable(tableName, indexName, conn);
     }
 
     @Test @Repeat(5)
@@ -204,9 +221,10 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
         final String indexName = generateUniqueName();
         Connection conn = DriverManager.getConnection(getUrl());
         conn.createStatement().execute("CREATE TABLE " + tableName
-                + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, 
CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+                + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, 
b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER," +
+                "CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, 
VERSIONS=1");
         TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
-        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(v1)");
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(v1) INCLUDE(v2, v3)");
         final CountDownLatch doneSignal = new CountDownLatch(nThreads);
         Runnable[] runnables = new Runnable[nThreads];
         for (int i = 0; i < nThreads; i++) {
@@ -216,11 +234,12 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
                     try {
                         Connection conn = 
DriverManager.getConnection(getUrl());
                         for (int i = 0; i < 10000; i++) {
-                            boolean isNull = RAND.nextBoolean();
-                            int randInt = RAND.nextInt() % nIndexValues;
                             conn.createStatement().execute(
                                     "UPSERT INTO " + tableName + " VALUES (" + 
(i % nRows) + ", 0, "
-                                            + (isNull ? null : randInt) + ")");
+                                            + (RAND.nextBoolean() ? null : 
(RAND.nextInt() % nIndexValues)) + ", "
+                                            + (RAND.nextBoolean() ? null : 
RAND.nextInt()) + ", "
+                                            + (RAND.nextBoolean() ? null : 
RAND.nextInt()) + ", "
+                                            + (RAND.nextBoolean() ? null : 
RAND.nextInt()) + ")");
                             if ((i % batchSize) == 0) {
                                 conn.commit();
                             }
@@ -241,7 +260,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
         }
 
         assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
-        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, 
indexName);
+        long actualRowCount = verifyIndexTable(tableName, indexName, conn);
         assertEquals(nRows, actualRowCount);
     }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
index f312df0..6562af0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -274,7 +274,8 @@ public class ConcurrentMutationsIT extends 
ParallelStatsDisabledIT {
             EnvironmentEdgeManager.injectEdge(null);
         }
     }
-
+    @Ignore ("It is not possible to assign the same timestamp two separately 
committed mutations in the current model\n" +
+            " except when the server time goes backward. In that case, the 
behavior is not deterministic")
     @Test
     public void testDeleteRowAndUpsertValueAtSameTS1() throws Exception {
         try {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index d3ca67d..30fd018 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -229,8 +229,6 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
             assertEquals(NROWS, 
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
-            // Check after compaction
-            TestUtil.doMajorCompaction(conn, dataTableFullName);
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
             setEveryNthRowWithNull(NROWS, 5, stmt);
@@ -241,7 +239,6 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.commit();
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
-            TestUtil.doMajorCompaction(conn, dataTableFullName);
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
             indexTool = runIndexTool(directApi, useSnapshot, schemaName, 
dataTableName, indexTableName, null,
@@ -526,7 +523,7 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
                     null, 0, IndexTool.IndexVerifyType.AFTER);
             assertEquals(1, MutationCountingRegionObserver.getMutationCount());
             MutationCountingRegionObserver.setMutationCount(0);
-            // Since all the rows are in the index table, running the index 
tool with the "-v BEFORE" option should
+            // Since all the rows are in the index table, running the index 
tool with the "-v BEFORE" option should not
             // write any index rows
             runIndexTool(directApi, useSnapshot, schemaName, viewName, 
indexTableName,
                     null, 0, IndexTool.IndexVerifyType.BEFORE);
@@ -614,16 +611,8 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
             // Run the index tool to populate the index while verifying rows
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, 
indexTableName,
                     null, 0, IndexTool.IndexVerifyType.AFTER);
-            // Corrupt one cell by writing directly into the index table
-            conn.createStatement().execute("upsert into " + indexTableFullName 
+ " values ('Phoenix', 1, 'B')");
-            conn.commit();
-            // Run the index tool using the only-verify option to detect this 
mismatch between the data and index table
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, 
indexTableName,
-                    null, -1, IndexTool.IndexVerifyType.ONLY);
-            cell = getErrorMessageFromIndexToolOutputTable(conn, 
dataTableFullName, indexTableFullName);
-            expectedValueBytes = Bytes.toBytes("Not matching value for 
0:0:CODE E:A A:B");
-            assertTrue(Bytes.compareTo(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength(),
-                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
             dropIndexToolTables(conn);
         }
     }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index f9c50fd..35489d0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -117,6 +117,76 @@ public class GlobalIndexCheckerIT extends 
BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
+    public void testDelete() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 
'abcd') and ('b', 'bc', 'bcd', 'bcde')
+            String dml = "DELETE from " + dataTableName + " WHERE id  = 'a'";
+            assertEquals(1, conn.createStatement().executeUpdate(dml));
+            conn.commit();
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + 
" on " +
+                    dataTableName + " (val1) include (val2, val3)" + (async ? 
"ASYNC" : ""));
+            if (async) {
+                // run the index MR job.
+                IndexToolIT.runIndexTool(true, false, null, dataTableName, 
indexTableName);
+            }
+            // Count the number of index rows
+            String query = "SELECT COUNT(*) from " + indexTableName;
+            // There should be one row in the index table
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            // Add rows and check everything is still okay
+            verifyTableHealth(conn, dataTableName, indexTableName);
+        }
+    }
+
+    @Test
+    public void testSimulateConcurrentUpdates() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 
'abcd') and ('b', 'bc', 'bcd', 'bcde')
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + 
" on " +
+                    dataTableName + " (val1) include (val2, val3)" + (async ? 
"ASYNC" : ""));
+            if (async) {
+                // run the index MR job.
+                IndexToolIT.runIndexTool(true, false, null, dataTableName, 
indexTableName);
+            }
+            // For the concurrent updates on the same row, the last write 
phase is ignored.
+            // Configure IndexRegionObserver to fail the last write phase 
(i.e., the post index update phase) where the
+            // verify flag is set to true and/or index rows are deleted and 
check that this does not impact the
+            // correctness.
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            // Do multiple updates on the same data row
+            conn.createStatement().execute("upsert into " + dataTableName + " 
(id, val2) values ('a', 'abcc')");
+            conn.createStatement().execute("upsert into " + dataTableName + " 
(id, val1) values ('a', 'aa')");
+            conn.commit();
+            // The expected state of the index table is  {('aa', 'a', 'abcc', 
'abcd'), ('bc', 'b', 'bcd', 'bcde')}
+            // Do more multiple updates on the same data row
+            conn.createStatement().execute("upsert into " + dataTableName + " 
(id, val1, val3) values ('a', null, null)");
+            conn.createStatement().execute("upsert into " + dataTableName + " 
(id, val1) values ('a', 'ab')");
+            conn.createStatement().execute("upsert into " + dataTableName + " 
(id, val1) values ('b', 'ab')");
+            conn.createStatement().execute("upsert into " + dataTableName + " 
(id, val1, val2) values ('b', 'ab', null)");
+            conn.commit();
+            // Now the expected state of the index table is  {('ab', 'a', 
'abcc' , null), ('ab', 'b', null, 'bcde')}
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * from 
"  + indexTableName);
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("a", rs.getString(2));
+            assertEquals("abcc", rs.getString(3));
+            assertEquals(null, rs.getString(4));
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertEquals(null, rs.getString(3));
+            assertEquals("bcde", rs.getString(4));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
     public void testFailPostIndexDeleteUpdate() throws Exception {
         String dataTableName = generateUniqueName();
         populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 
'abcd') and ('b', 'bc', 'bcd', 'bcde')
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 4392e23..99caa6e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -17,27 +17,30 @@
  */
 package org.apache.phoenix.compile;
 
-import java.sql.SQLException;
-import java.util.Collections;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.schema.*;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 
+import java.sql.SQLException;
+import java.util.Collections;
+
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan;
 
@@ -96,9 +99,9 @@ public class ServerBuildIndexCompiler {
                 throw new IllegalArgumentException(
                         "ServerBuildIndexCompiler does not support global 
indexes on transactional tables");
             }
+            IndexMaintainer indexMaintainer = 
index.getIndexMaintainer(dataTable, connection);
             // By default, we'd use a FirstKeyOnly filter as nothing else 
needs to be projected for count(*).
             // However, in this case, we need to project all of the data 
columns that contribute to the index.
-            IndexMaintainer indexMaintainer = 
index.getIndexMaintainer(dataTable, connection);
             for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
                 if (index.getImmutableStorageScheme() == 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                     scan.addFamily(columnRef.getFamily());
@@ -121,6 +124,8 @@ public class ServerBuildIndexCompiler {
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
                 scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
                 ScanUtil.setClientVersion(scan, 
MetaDataProtocol.PHOENIX_VERSION);
+                
scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
+                BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
                 addEmptyColumnToScan(scan, 
indexMaintainer.getDataEmptyKeyValueCF(), 
indexMaintainer.getEmptyKeyValueQualifier());
             }
             if (dataTable.isTransactional()) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 598eb5f..a52bddf 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -17,8 +17,8 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
 import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
-import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
 import static 
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
 import static 
org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
 import static 
org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
@@ -42,13 +42,15 @@ import static 
org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import java.util.NavigableSet;
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -70,6 +72,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.filter.SkipScanFilter;
@@ -85,6 +88,7 @@ import 
org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -93,6 +97,7 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
@@ -100,7 +105,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 public class IndexRebuildRegionScanner extends BaseRegionScanner {
 
@@ -309,20 +313,14 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         public boolean isVerificationFailed(IndexTool.IndexVerifyType 
verifyType) {
             if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType 
== IndexTool.IndexVerifyType.NONE) {
                 return false;
-            }
-            if (verifyType == IndexTool.IndexVerifyType.ONLY) {
-                if (before.validIndexRowCount + before.expiredIndexRowCount != 
scannedDataRowCount) {
+            } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+                if (before.invalidIndexRowCount + before.missingIndexRowCount 
> 0) {
                     return true;
                 }
-            }
-            if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == 
IndexTool.IndexVerifyType.AFTER) {
+            } else if (verifyType == IndexTool.IndexVerifyType.BOTH || 
verifyType == IndexTool.IndexVerifyType.AFTER) {
                 if (after.invalidIndexRowCount + after.missingIndexRowCount > 
0) {
                     return true;
                 }
-                if (before.validIndexRowCount + before.expiredIndexRowCount +
-                        after.expiredIndexRowCount + after.validIndexRowCount 
!= scannedDataRowCount) {
-                    return true;
-                }
             }
             return false;
         }
@@ -360,8 +358,8 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
     private Table resultHTable = null;
     private IndexTool.IndexVerifyType verifyType = 
IndexTool.IndexVerifyType.NONE;
     private boolean verify = false;
-    private Map<byte[], Put> indexKeyToDataPutMap;
-    private Map<byte[], Put> dataKeyToDataPutMap;
+    private Map<byte[], List<Mutation>> indexKeyToMutationMap;
+    private Map<byte[], Pair<Put, Delete>> dataKeyToMutationMap;
     private TaskRunner pool;
     private TaskBatch<Boolean> tasks;
     private String exceptionMessage;
@@ -371,15 +369,21 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
     private int indexTableTTL;
     private VerificationResult verificationResult;
     private boolean isBeforeRebuilt = true;
-
-    IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region 
region, final Scan scan,
-                               final RegionCoprocessorEnvironment env,
-                               UngroupedAggregateRegionObserver 
ungroupedAggregateRegionObserver) throws IOException {
+    private boolean partialRebuild = false;
+    private int  singleRowRebuildReturnCode;
+    private Map<byte[], NavigableSet<byte[]>> familyMap;
+    private byte[][] viewConstants;
+
+    IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region 
region, final Scan scan,
+                              final RegionCoprocessorEnvironment env,
+                              UngroupedAggregateRegionObserver 
ungroupedAggregateRegionObserver) throws IOException {
         super(innerScanner);
         final Configuration config = env.getConfiguration();
         if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) 
!= null) {
             pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
                     
QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+        } else {
+            partialRebuild = true;
         }
         maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
         mutations = new 
UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
@@ -392,31 +396,38 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
             useProto = false;
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
-        if (!scan.isRaw()) {
-            // No need to deserialize index maintainers when the scan is raw. 
Raw scan is used by partial rebuilds
-            List<IndexMaintainer> maintainers = 
IndexMaintainer.deserialize(indexMetaData, true);
-            indexMaintainer = maintainers.get(0);
-        }
+        List<IndexMaintainer> maintainers = 
IndexMaintainer.deserialize(indexMetaData, true);
+        indexMaintainer = maintainers.get(0);
         this.scan = scan;
+        familyMap = scan.getFamilyMap();
+        if (familyMap.isEmpty()) {
+            familyMap = null;
+        }
+
         this.innerScanner = innerScanner;
         this.region = region;
         this.env = env;
         this.ungroupedAggregateRegionObserver = 
ungroupedAggregateRegionObserver;
         indexRowKey = 
scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
+        if (indexRowKey != null) {
+            setReturnCodeForSingleRowRebuild();
+            pageSizeInRows = 1;
+        }
         byte[] valueBytes = 
scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
         if (valueBytes != null) {
             verificationResult = new VerificationResult();
             verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
             if (verifyType != IndexTool.IndexVerifyType.NONE) {
                 verify = true;
+                viewConstants = 
IndexUtil.deserializeViewConstantsFromScan(scan);
                 // Create the following objects only for rebuilds by IndexTool
                 hTableFactory = ServerUtil.getDelegateHTableFactory(env, 
ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
                 indexHTable = hTableFactory.getTable(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
                 indexTableTTL = 
indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
                outputHTable = hTableFactory.getTable(new 
ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
                 resultHTable = hTableFactory.getTable(new 
ImmutableBytesPtr(IndexTool.RESULT_TABLE_NAME_BYTES));
-                indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-                dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                indexKeyToMutationMap = 
Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                 pool = new 
WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
                         new ThreadPoolBuilder("IndexVerify",
                                 
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
@@ -428,13 +439,38 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         }
     }
 
+    private void setReturnCodeForSingleRowRebuild() throws IOException {
+        try (RegionScanner scanner = region.getScanner(scan)) {
+            List<Cell> row = new ArrayList<>();
+            scanner.next(row);
+            // Check if the data table row we have just scanned matches with 
the index row key.
+            // If not, there is no need to build the index row from this data 
table row,
+            // and just return zero row count.
+            if (row.isEmpty()) {
+                singleRowRebuildReturnCode = 
GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue();
+            } else {
+                Put put = new Put(CellUtil.cloneRow(row.get(0)));
+                for (Cell cell : row) {
+                    put.add(cell);
+                }
+                if (checkIndexRow(indexRowKey, put)) {
+                    singleRowRebuildReturnCode = 
GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
+                } else {
+                    singleRowRebuildReturnCode = 
GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
+                }
+            }
+        }
+    }
+
     @Override
     public HRegionInfo getRegionInfo() {
         return region.getRegionInfo();
     }
 
     @Override
-    public boolean isFilterDone() { return false; }
+    public boolean isFilterDone() {
+        return false;
+    }
 
     private void logToIndexToolResultTable() throws IOException {
         long scanMaxTs = scan.getTimeRange().getMax();
@@ -505,38 +541,6 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         m.setDurability(Durability.SKIP_WAL);
     }
 
-    private Delete generateDeleteMarkers(Put put) {
-        Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
-        int cellCount = put.size();
-        if (cellCount == allColumns.size() + 1) {
-            // We have all the columns for the index table. So, no delete 
marker is needed
-            return null;
-        }
-        Set<ColumnReference> includedColumns = 
Sets.newLinkedHashSetWithExpectedSize(cellCount);
-        long ts = 0;
-        for (List<Cell> cells : put.getFamilyCellMap().values()) {
-            if (cells == null) {
-                break;
-            }
-            for (Cell cell : cells) {
-                includedColumns.add(new 
ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
-                if (ts < cell.getTimestamp()) {
-                    ts = cell.getTimestamp();
-                }
-            }
-        }
-        Delete del = null;
-        for (ColumnReference column : allColumns) {
-            if (!includedColumns.contains(column)) {
-                if (del == null) {
-                    del = new Delete(put.getRow());
-                }
-                del.addColumns(column.getFamily(), column.getQualifier(), ts);
-            }
-        }
-        return del;
-    }
-
     private byte[] commitIfReady(byte[] uuidValue, 
UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
         if (ServerUtil.readyToCommit(mutationList.size(), 
mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
             ungroupedAggregateRegionObserver.checkForRegionClosing();
@@ -547,12 +551,14 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         return uuidValue;
     }
 
-    private class SimpleValueGetter implements ValueGetter {
+    public static class SimpleValueGetter implements ValueGetter {
         final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
         final Put put;
-        SimpleValueGetter (final Put put) {
+
+        public SimpleValueGetter(final Put put) {
             this.put = put;
         }
+
         @Override
         public ImmutableBytesWritable getLatestValue(ColumnReference ref, long 
ts) throws IOException {
             List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
@@ -595,7 +601,7 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
     }
 
     private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] 
indexRowKey, long dataRowTs, long indexRowTs,
-                                           String errorMsg, byte[] 
expectedValue,  byte[] actualValue) throws IOException {
+                                           String errorMsg, byte[] 
expectedValue, byte[] actualValue) throws IOException {
         final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
         final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
         final int PREFIX_LENGTH = 3;
@@ -643,8 +649,7 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
             length += PREFIX_LENGTH;
             Bytes.putBytes(errorMessageBytes, length, actualValue, 0, 
actualValue.length);
 
-        }
-        else {
+        } else {
             errorMessageBytes = Bytes.toBytes(errorMsg);
         }
         put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, 
IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
@@ -656,21 +661,11 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         outputHTable.put(put);
     }
 
-    private long getMaxTimestamp(Result result) {
+    private static long getMaxTimestamp(Mutation m) {
         long ts = 0;
-        for (Cell cell : result.rawCells()) {
-            if (ts < cell.getTimestamp()) {
-                ts = cell.getTimestamp();
-            }
-        }
-        return ts;
-    }
-
-    private long getMaxTimestamp(Put put) {
-        long ts = 0;
-        for (List<Cell> cells : put.getFamilyCellMap().values()) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
             if (cells == null) {
-                break;
+                continue;
             }
             for (Cell cell : cells) {
                 if (ts < cell.getTimestamp()) {
@@ -681,132 +676,383 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         return ts;
     }
 
-    private boolean verifySingleIndexRow(Result indexRow, final Put dataRow) 
throws IOException {
-        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
-        long ts = getMaxTimestamp(dataRow);
-        Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
-                valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, 
null, null);
-        if (indexPut == null) {
-            // This means the data row does not have any covered column values
-            indexPut = new Put(indexRow.getRow());
+    private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
+        List<Cell> cellList = m.getFamilyCellMap().get(family);
+        if (cellList == null) {
+            return null;
         }
-        else {
-            // Remove the empty column prepared by Index codec as we need to 
change its value
-            removeEmptyColumn(indexPut, 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
-                    indexMaintainer.getEmptyKeyValueQualifier());
+        for (Cell cell : cellList) {
+            if (CellUtil.matchingQualifier(cell, qualifier)) {
+                return cell;
+            }
         }
-        // Add the empty column
-        
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
-                indexMaintainer.getEmptyKeyValueQualifier(), ts, 
VERIFIED_BYTES);
-        int cellCount = 0;
-        long currentTime = EnvironmentEdgeManager.currentTime();
-        for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
+        return null;
+    }
+
+    private boolean isMatchingMutation(Mutation expected, Mutation actual, int 
version) throws IOException {
+        if (getTimestamp(expected) != getTimestamp(actual)) {
+            String errorMsg = "Not matching timestamp";
+            byte[] dataKey = indexMaintainer.buildDataRowKey(new 
ImmutableBytesWritable(expected.getRow()), viewConstants);
+            logToIndexToolOutputTable(dataKey, expected.getRow(), 
getTimestamp(expected), getTimestamp(actual),
+                    errorMsg, null, null);
+            return false;
+        }
+        for (List<Cell> cells : expected.getFamilyCellMap().values()) {
             if (cells == null) {
-                break;
+                continue;
             }
             for (Cell expectedCell : cells) {
                 byte[] family = CellUtil.cloneFamily(expectedCell);
                 byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
-                Cell actualCell = indexRow.getColumnLatestCell(family, 
qualifier);
-                if (actualCell == null) {
-                    // Check if cell expired as per the current server's time 
and data table ttl
-                    // Index table should have the same ttl as the data table, 
hence we might not
-                    // get a value back from index if it has already expired 
between our rebuild and
-                    // verify
-                    // TODO: have a metric to update for these cases
-                    if (isTimestampBeforeTTL(currentTime, 
expectedCell.getTimestamp())) {
-                        continue;
-                    }
-                    String errorMsg = " Missing cell " + 
Bytes.toString(family) + ":" +
-                            Bytes.toString(qualifier);
-                    logToIndexToolOutputTable(dataRow.getRow(), 
indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+                Cell actualCell = getCell(actual, family, qualifier);
+                if (actualCell == null ||
+                        !CellUtil.matchingType(expectedCell, actualCell)) {
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new 
ImmutableBytesWritable(expected.getRow()), viewConstants);
+                    String errorMsg = "Missing cell (in version " + version + 
") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+                    logToIndexToolOutputTable(dataKey, expected.getRow(), 
getTimestamp(expected), getTimestamp(actual), errorMsg);
                     return false;
                 }
-                if (actualCell.getTimestamp() < ts) {
-                    // Skip older cells since a Phoenix index row is composed 
of cells with the same timestamp
-                    continue;
-                }
-                // Check all columns
                 if (!CellUtil.matchingValue(actualCell, expectedCell)) {
-                    String errorMsg = "Not matching value for " + 
Bytes.toString(family) + ":" +
-                            Bytes.toString(qualifier);
-                    logToIndexToolOutputTable(dataRow.getRow(), 
indexRow.getRow(), ts, getMaxTimestamp(indexRow),
+                    if (Bytes.compareTo(family, 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary()) == 0 &&
+                            Bytes.compareTo(qualifier, 
indexMaintainer.getEmptyKeyValueQualifier()) == 0) {
+                        if (Bytes.compareTo(actualCell.getValueArray(), 
actualCell.getValueOffset(), actualCell.getValueLength(),
+                                UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) 
== 0) {
+                            // We will not flag this as mismatch but will log 
it
+                            byte[] dataKey = 
indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), 
viewConstants);
+                            String errorMsg = "Unverified index row (in 
version " + version + ")";
+                            logToIndexToolOutputTable(dataKey, 
expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg);
+                            continue;
+                        }
+                    }
+                    String errorMsg = "Not matching value (in version " + 
version + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new 
ImmutableBytesWritable(expected.getRow()), viewConstants);
+                    logToIndexToolOutputTable(dataKey, expected.getRow(), 
getTimestamp(expected), getTimestamp(actual),
                             errorMsg, CellUtil.cloneValue(expectedCell), 
CellUtil.cloneValue(actualCell));
                     return false;
-                } else if (actualCell.getTimestamp() != ts) {
-                    String errorMsg = "Not matching timestamp for " + 
Bytes.toString(family) + ":" +
-                            Bytes.toString(qualifier) + " E: " + ts + " A: " +
-                            actualCell.getTimestamp();
-                    logToIndexToolOutputTable(dataRow.getRow(), 
indexRow.getRow(), ts, getMaxTimestamp(indexRow),
-                            errorMsg, null, null);
+                }
+            }
+        }
+        return true;
+    }
+
+    private boolean isVerified(Put mutation) throws IOException {
+        List<Cell> cellList = 
mutation.get(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier());
+        Cell cell = (cellList != null && !cellList.isEmpty()) ? 
cellList.get(0) : null;
+        if (cell == null) {
+            throw new DoNotRetryIOException("No empty column cell");
+        }
+        if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength(),
+                VERIFIED_BYTES, 0, VERIFIED_BYTES.length) == 0) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * This is to reorder the mutations in descending order by the tuple of 
timestamp and mutation type where
+     * delete comes before put
+     */
+    public static final Comparator<Mutation> MUTATION_TS_DESC_COMPARATOR = new 
Comparator<Mutation>() {
+        @Override
+        public int compare(Mutation o1, Mutation o2) {
+            long ts1 = getTimestamp(o1);
+            long ts2 = getTimestamp(o2);
+            if (ts1 > ts2) {
+                return -1;
+            }
+            if (ts1 < ts2) {
+                return 1;
+            }
+            if (o1 instanceof Delete && o2 instanceof Put) {
+                return -1;
+            }
+            if (o1 instanceof Put && o2 instanceof Delete) {
+                return 1;
+            }
+            return 0;
+        }
+    };
+
+    private boolean isDeleteFamily(Mutation mutation) {
+        for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (KeyValue.Type.codeToType(cell.getTypeByte()) == 
KeyValue.Type.DeleteFamily) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean isDeleteFamilyVersion(Mutation mutation) {
+        for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (KeyValue.Type.codeToType(cell.getTypeByte()) == 
KeyValue.Type.DeleteFamilyVersion) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private static List<Mutation> prepareActualIndexMutations(Result indexRow) 
throws IOException {
+        Put put = null;
+        Delete del = null;
+        for (Cell cell : indexRow.rawCells()) {
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == 
KeyValue.Type.Put) {
+                if (put == null) {
+                    put = new Put(CellUtil.cloneRow(cell));
+                }
+                put.add(cell);
+            } else {
+                if (del == null) {
+                    del = new Delete(CellUtil.cloneRow(cell));
+                }
+                del.addDeleteMarker(cell);
+            }
+        }
+        return getMutationsWithSameTS(put, del);
+    }
+
+    /**
+     * indexRow is the set of all cells of all the row version of an index row 
from the index table. These are actual
+     * cells. We group these cells based on timestamp and type (put vs 
delete), and form the actual set of
+     * index mutations. indexKeyToMutationMap is a map from an index row key 
to a set of mutations that are generated
+     * using the rebuild process (i.e., by replaying raw data table 
mutations). These sets are sets of expected
+     * index mutations, one set for each index row key. Since not all 
mutations in the index table have both phase
+     * (i.e., pre and post data phase) mutations, we cannot compare actual 
index mutations with expected one by one
+     * and expect to find them identical. We need to consider concurrent data 
mutation effects, data table row write
+     * failures, post index write failures. Thus, we need to allow some 
expected and actual mutations to be skipped
+     * during comparing actual mutations to index mutations.
+     *
+     * The main idea for the verification algorithm used here is to match 
every expected verified put with an actual
+     * put such that these two mutations are the same except that actual 
mutation can be unverified.
+     *
+     * Some background on why we can skip some of the actual unverified puts 
and delete markers due to concurrent data
+     * table updates is as follows:
+     *
+     * For each data table mutation, two operations are done on the data 
table. One is to read the existing row state,
+     * and the second is to write to the data table. The processing of 
concurrent data mutations are serialized once
+     * for reading the existing row states, and then serialized again for 
updating data table. In other words,
+     * they go through locking twice, i.e., [lock, read, unlock] and [lock, 
write, unlock]. Because of this two phase
+     * locking, for a pair of concurrent mutations (for the same row), the 
same row state can be read from the data
+     * table. This means the same existing index row can be made unverified 
twice with different timestamps, one for
+     * each concurrent mutation. These unverified mutation are then repaired 
from the data table. Since expected
+     * mutations are used for rebuild (which is also used by the read repair), 
skipping these unverified put mutations
+     * that are not matching with expected mutation are safe as they will go 
through the same process during
+     * read repair and will be skipped and eventually cleaned up by the read 
repair. We can skip the delete markers
+     * safely too as they are placed to clean up these unverified mutations. 
When the data table rows are rebuilt,
+     * the rebuild process generates the delete family markers. The timestamp 
of delete markers are the timestamp of
+     * the data table mutation for which the delete marker is added. Thus, the 
timestamp of these delete markers will be
+     * higher than the timestamp of index row to be deleted.
+     */
+    private boolean verifySingleIndexRow(Result indexRow, 
VerificationResult.PhaseResult verificationPhaseResult)
+            throws IOException {
+        List<Mutation> expectedMutationList = 
indexKeyToMutationMap.get(indexRow.getRow());
+        if (expectedMutationList == null) {
+            throw new DoNotRetryIOException("No expected mutation");
+        }
+        List<Mutation> actualMutationList = 
prepareActualIndexMutations(indexRow);
+        if (actualMutationList == null || actualMutationList.isEmpty()) {
+            throw new DoNotRetryIOException("actualMutationList is null or 
empty");
+        }
+        Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR);
+        Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
+        long currentTime = EnvironmentEdgeManager.currentTime();
+        int actualIndex = 0;
+        int expectedIndex = 0;
+        int matchingCount = 0;
+        int expectedSize = expectedMutationList.size();
+        int actualSize = actualMutationList.size();
+        Mutation expected = null;
+        Mutation previousExpected;
+        Mutation actual;
+        while (expectedIndex < expectedSize && actualIndex <actualSize) {
+            previousExpected = expected;
+            expected = expectedMutationList.get(expectedIndex);
+            // Check if cell expired as per the current server's time and data 
table ttl
+            // Index table should have the same ttl as the data table, hence 
we might not
+            // get a value back from index if it has already expired between 
our rebuild and
+            // verify
+            // TODO: have a metric to update for these cases
+            if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
+                verificationPhaseResult.expiredIndexRowCount++;
+                return true;
+            }
+            actual = actualMutationList.get(actualIndex);
+            if (expected instanceof Put) {
+                if (previousExpected == null || previousExpected instanceof 
Put) {
+                    // This expected put is either the first mutation or a put 
just comes after another expected mutation
+                    // on the expected mutation list which is sorted by the 
mutation timestamps. The cell timestamps
+                    // within each mutation here are the same.
+                    // Go down the list of actual mutations and find the 
corresponding actual put mutation with the same
+                    // timestamp. Stop if a verified put or delete family 
mutation is encountered on the way. Skip
+                    // unverified puts or delete family version delete markers.
+                    while (getTimestamp(actual) > getTimestamp(expected) &&
+                            ((actual instanceof Put && !isVerified((Put) 
actual)) ||
+                                    (actual instanceof Delete && 
isDeleteFamilyVersion(actual)))) {
+                        actualIndex++;
+                        if (actualIndex == actualSize) {
+                            break;
+                        }
+                        actual = actualMutationList.get(actualIndex);
+                    }
+                } else { // previousExpected instanceof Delete
+                    // Between an expected delete and put, there cannot be any 
types of mutation even verified put
+                    while (getTimestamp(actual) > getTimestamp(expected)) {
+                        actualIndex++;
+                        if (actualIndex == actualSize) {
+                            break;
+                        }
+                        actual = actualMutationList.get(actualIndex);
+                    }
+                }
+                if (actualIndex == actualSize) {
+                    break;
+                }
+                // Now the expected and actual mutations should match
+                if (isMatchingMutation(expected, actual, expectedIndex)) {
+                    expectedIndex++;
+                    actualIndex++;
+                    matchingCount++;
+                    continue;
+                }
+                verificationPhaseResult.invalidIndexRowCount++;
+                return false;
+            } else { // expected instanceof Delete
+                // Between put and delete, delete and delete, or before first 
delete, there can be other deletes and
+                // unverified puts. Skip all of them if any
+                while (getTimestamp(actual) > getTimestamp(expected) &&
+                        ((actual instanceof Put && !isVerified((Put) actual)) 
|| actual instanceof Delete)) {
+                    actualIndex++;
+                    if (actualIndex == actualSize) {
+                        break;
+                    }
+                    actual = actualMutationList.get(actualIndex);
+                }
+                if (actualIndex == actualSize) {
+                    break;
+                }
+                // If this is first expected mutation is delete, there should 
be an actual delete mutation with the
+                // same timestamp or an unverified put with the same or older 
timestamp
+                if (getTimestamp(actual) == getTimestamp(expected) &&
+                        (actual instanceof Delete && isDeleteFamily(actual))) {
+                    expectedIndex++;
+                    actualIndex++;
+                    matchingCount++;
+                    continue;
+                } else if (getTimestamp(actual) <= getTimestamp(expected) &&
+                        (actual instanceof Put && !isVerified((Put) actual))) {
+                    expectedIndex++;
+                    if (previousExpected == null) {
+                        matchingCount++;
+                    }
+                    continue;
+                }
+                if (previousExpected == null) {
+                    String errorMsg = "First delete check failure";
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new 
ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                    logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                            getTimestamp(expected),
+                            getTimestamp(actual), errorMsg);
+                    verificationPhaseResult.invalidIndexRowCount++;
                     return false;
                 }
-                cellCount++;
             }
         }
-        if (cellCount != indexRow.rawCells().length) {
-            String errorMsg = "Expected to find " + cellCount + " cells but 
got "
-                    + indexRow.rawCells().length + " cells";
-            logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, 
getMaxTimestamp(indexRow), errorMsg);
-            return false;
+        if ((expectedIndex != expectedSize) || actualIndex != actualSize) {
+            if (matchingCount > 0) {
+                // We do not consider this as a verification issue but log it 
for further information.
+                // This may happen due to compaction
+                byte[] dataKey = indexMaintainer.buildDataRowKey(new 
ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                String errorMsg = "Expected to find " + 
expectedMutationList.size() + " mutations but got "
+                        + actualMutationList.size();
+                logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                        getTimestamp(expectedMutationList.get(0)),
+                        getTimestamp(actualMutationList.get(0)), errorMsg);
+            } else {
+                byte[] dataKey = indexMaintainer.buildDataRowKey(new 
ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                String errorMsg = "Not matching index row";
+                logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                        getTimestamp(expectedMutationList.get(0)),
+                        getTimestamp(actualMutationList.get(0)), errorMsg);
+                verificationPhaseResult.invalidIndexRowCount++;
+                return false;
+            }
         }
+        verificationPhaseResult.validIndexRowCount++;
         return true;
     }
 
-    private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> 
perTaskDataKeyToDataPutMap,
+    private static long getMaxTimestamp(Pair<Put, Delete> pair) {
+        Put put = pair.getFirst();
+        long ts1 = 0;
+        if (put != null) {
+            ts1 = getMaxTimestamp((Mutation)put);
+        }
+        Delete del = pair.getSecond();
+        long ts2 = 0;
+        if (del != null) {
+            ts1 = getMaxTimestamp((Mutation)del);
+        }
+        return (ts1 > ts2) ? ts1 : ts2;
+    }
+
+    private void verifyIndexRows(List<KeyRange> keys,
                                  VerificationResult.PhaseResult 
verificationPhaseResult) throws IOException {
-        int expectedRowCount = keys.size();
+        List<KeyRange> invalidKeys = new ArrayList<>();
         ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
         Scan indexScan = new Scan();
         indexScan.setTimeRange(scan.getTimeRange().getMin(), 
scan.getTimeRange().getMax());
         scanRanges.initializeScan(indexScan);
         SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
-        indexScan.setFilter(skipScanFilter);
-        int rowCount = 0;
+        indexScan.setFilter(new SkipScanFilter(skipScanFilter, true));
+        indexScan.setRaw(true);
+        indexScan.setMaxVersions();
         try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
             for (Result result = resultScanner.next(); (result != null); 
result = resultScanner.next()) {
-                Put dataPut = indexKeyToDataPutMap.get(result.getRow());
-                if (dataPut == null) {
-                    // This should never happen
-                    String errorMsg = "Missing data row";
-                    logToIndexToolOutputTable(null, result.getRow(), 0, 
getMaxTimestamp(result), errorMsg);
-                    exceptionMessage = "Index verify failed - Missing data row 
- " + indexHTable.getName();
-                    throw new IOException(exceptionMessage);
+                KeyRange keyRange = 
PVarbinary.INSTANCE.getKeyRange(result.getRow());
+                if (!keys.contains(keyRange)) {
+                    continue;
                 }
-                if (verifySingleIndexRow(result, dataPut)) {
-                    verificationPhaseResult.validIndexRowCount++;
-                    perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
-                } else {
-                    verificationPhaseResult.invalidIndexRowCount++;
+                if (!verifySingleIndexRow(result, verificationPhaseResult)) {
+                    invalidKeys.add(keyRange);
                 }
-                rowCount++;
+                keys.remove(keyRange);
             }
         } catch (Throwable t) {
             ServerUtil.throwIOException(indexHTable.getName().toString(), t);
         }
         // Check if any expected rows from index(which we didn't get) are 
already expired due to TTL
         // TODO: metrics for expired rows
-        if (!perTaskDataKeyToDataPutMap.isEmpty()) {
-            Iterator<Entry<byte[], Put>> itr = 
perTaskDataKeyToDataPutMap.entrySet().iterator();
+        if (!keys.isEmpty()) {
+            Iterator<KeyRange> itr = keys.iterator();
             long currentTime = EnvironmentEdgeManager.currentTime();
             while(itr.hasNext()) {
-                Entry<byte[], Put> entry = itr.next();
-                long ts = getMaxTimestamp(entry.getValue());
-                if (isTimestampBeforeTTL(currentTime, ts)) {
+                KeyRange keyRange = itr.next();
+                byte[] key = keyRange.getLowerRange();
+                List<Mutation> mutationList = indexKeyToMutationMap.get(key);
+                if (isTimestampBeforeTTL(currentTime, 
getTimestamp(mutationList.get(mutationList.size() - 1)))) {
                     itr.remove();
-                    rowCount++;
                     verificationPhaseResult.expiredIndexRowCount++;
                 }
             }
         }
-        if (rowCount != expectedRowCount) {
-            for (Map.Entry<byte[], Put> entry : 
perTaskDataKeyToDataPutMap.entrySet()) {
+        if (keys.size() > 0) {
+            for (KeyRange keyRange : keys) {
                 String errorMsg = "Missing index row";
-                logToIndexToolOutputTable(entry.getKey(), null, 
getMaxTimestamp(entry.getValue()),
-                        0, errorMsg);
+                byte[] key = keyRange.getLowerRange();
+                List<Mutation> mutationList = indexKeyToMutationMap.get(key);
+                byte[] dataKey = indexMaintainer.buildDataRowKey(new 
ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
+                logToIndexToolOutputTable(dataKey,
+                        keyRange.getLowerRange(),
+                        getMaxTimestamp(dataKeyToMutationMap.get(dataKey)),
+                        getTimestamp(mutationList.get(mutationList.size() - 
1)), errorMsg);
             }
-            verificationPhaseResult.missingIndexRowCount += expectedRowCount - 
rowCount;
+            verificationPhaseResult.missingIndexRowCount += keys.size();
         }
+        keys.addAll(invalidKeys);
     }
 
     private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) {
@@ -816,7 +1062,7 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
     }
 
-    private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], 
Put> perTaskDataKeyToDataPutMap,
+    private void addVerifyTask(final List<KeyRange> keys,
                                final VerificationResult.PhaseResult 
verificationPhaseResult) {
         tasks.add(new Task<Boolean>() {
             @Override
@@ -826,7 +1072,7 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
                         exceptionMessage = "Pool closed, not attempting to 
verify index rows! " + indexHTable.getName();
                         throw new IOException(exceptionMessage);
                     }
-                    verifyIndexRows(keys, perTaskDataKeyToDataPutMap, 
verificationPhaseResult);
+                    verifyIndexRows(keys, verificationPhaseResult);
                 } catch (Exception e) {
                     throw e;
                 }
@@ -836,32 +1082,26 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
     }
 
     private void parallelizeIndexVerify(VerificationResult.PhaseResult 
verificationPhaseResult) throws IOException {
-        for (Mutation mutation : mutations) {
-            indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), 
(Put)mutation);
-        }
-        int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / 
rowCountPerTask;
+        int taskCount = (indexKeyToMutationMap.size() + rowCountPerTask - 1) / 
rowCountPerTask;
         tasks = new TaskBatch<>(taskCount);
-        List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount);
+        List<List<KeyRange>> listOfKeyRangeList = new ArrayList<>(taskCount);
         List<VerificationResult.PhaseResult> verificationPhaseResultList = new 
ArrayList<>(taskCount);
         List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
-        Map<byte[], Put> perTaskDataKeyToDataPutMap = 
Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-        dataPutMapList.add(perTaskDataKeyToDataPutMap);
+        listOfKeyRangeList.add(keys);
         VerificationResult.PhaseResult perTaskVerificationPhaseResult = new 
VerificationResult.PhaseResult();
         verificationPhaseResultList.add(perTaskVerificationPhaseResult);
-        for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
-            keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
-            perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), 
entry.getValue());
+        for (byte[] indexKey: indexKeyToMutationMap.keySet()) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey));
             if (keys.size() == rowCountPerTask) {
-                addVerifyTask(keys, perTaskDataKeyToDataPutMap, 
perTaskVerificationPhaseResult);
+                addVerifyTask(keys, perTaskVerificationPhaseResult);
                 keys = new ArrayList<>(rowCountPerTask);
-                perTaskDataKeyToDataPutMap = 
Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-                dataPutMapList.add(perTaskDataKeyToDataPutMap);
+                listOfKeyRangeList.add(keys);
                 perTaskVerificationPhaseResult = new 
VerificationResult.PhaseResult();
                 
verificationPhaseResultList.add(perTaskVerificationPhaseResult);
             }
         }
         if (keys.size() > 0) {
-            addVerifyTask(keys, perTaskDataKeyToDataPutMap, 
perTaskVerificationPhaseResult);
+            addVerifyTask(keys, perTaskVerificationPhaseResult);
         }
         List<Boolean> taskResultList = null;
         try {
@@ -878,30 +1118,41 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
                 throw new IOException(exceptionMessage);
             }
         }
-        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == 
IndexTool.IndexVerifyType.BOTH) {
-            for (Map<byte[], Put> dataPutMap : dataPutMapList) {
-                dataKeyToDataPutMap.putAll(dataPutMap);
-            }
-        }
         for (VerificationResult.PhaseResult result : 
verificationPhaseResultList) {
             verificationPhaseResult.add(result);
         }
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == 
IndexTool.IndexVerifyType.BOTH) {
+            Map<byte[], Pair<Put, Delete>> newDataKeyToMutationMap = 
Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+            for (List<KeyRange> keyRangeList : listOfKeyRangeList) {
+                for (KeyRange keyRange : keyRangeList) {
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new 
ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
+                    newDataKeyToMutationMap.put(dataKey, 
dataKeyToMutationMap.get(dataKey));
+                }
+            }
+            dataKeyToMutationMap.clear();
+            dataKeyToMutationMap = newDataKeyToMutationMap;
+        }
     }
 
     private void 
rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) 
throws IOException {
         byte[] uuidValue = ServerCacheClient.generateId();
         UngroupedAggregateRegionObserver.MutationList currentMutationList =
                 new 
UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+        Put put = null;
         for (Mutation mutation : mutationList) {
-            Put put = (Put) mutation;
-            currentMutationList.add(mutation);
-            setMutationAttributes(put, uuidValue);
-            uuidValue = commitIfReady(uuidValue, currentMutationList);
-            Delete deleteMarkers = generateDeleteMarkers(put);
-            if (deleteMarkers != null) {
-                setMutationAttributes(deleteMarkers, uuidValue);
-                currentMutationList.add(deleteMarkers);
+            if (mutation instanceof Put) {
+                if (put != null) {
+                    // back to back put, i.e., no delete in between. we can 
commit the previous put
+                    uuidValue = commitIfReady(uuidValue, currentMutationList);
+                }
+                currentMutationList.add(mutation);
+                setMutationAttributes(mutation, uuidValue);
+                put = (Put)mutation;
+            } else {
+                currentMutationList.add(mutation);
+                setMutationAttributes(mutation, uuidValue);
                 uuidValue = commitIfReady(uuidValue, currentMutationList);
+                put = null;
             }
         }
         if (!currentMutationList.isEmpty()) {
@@ -912,11 +1163,11 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
 
     private void verifyAndOrRebuildIndex() throws IOException {
         VerificationResult nextVerificationResult = new VerificationResult();
-        nextVerificationResult.scannedDataRowCount = mutations.size();
+        nextVerificationResult.scannedDataRowCount = 
dataKeyToMutationMap.size();
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == 
IndexTool.IndexVerifyType.NONE) {
             // For these options we start with rebuilding index rows
             rebuildIndexRows(mutations);
-            nextVerificationResult.rebuiltIndexRowCount = mutations.size();
+            nextVerificationResult.rebuiltIndexRowCount = 
dataKeyToMutationMap.size();
             isBeforeRebuilt = false;
         }
         if (verifyType == IndexTool.IndexVerifyType.NONE) {
@@ -928,73 +1179,285 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
             // For these options we start with verifying index rows
             parallelizeIndexVerify(verificationPhaseResult);
             nextVerificationResult.before.add(verificationPhaseResult);
-            if (mutations.size() != verificationPhaseResult.getTotalCount()) {
-                throw new DoNotRetryIOException(
-                        "mutations.size() != 
verificationPhaseResult.getTotalCount() at the before phase " +
-                                nextVerificationResult + " 
dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
-            }
         }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == 
IndexTool.IndexVerifyType.BOTH) {
             // For these options, we have identified the rows to be rebuilt 
and now need to rebuild them
             // At this point, dataKeyToDataPutMap includes mapping only for 
the rows to be rebuilt
             mutations.clear();
-            for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet()) 
{
-                mutations.add(entry.getValue());
+
+            for (Map.Entry<byte[], Pair<Put, Delete>> entry: 
dataKeyToMutationMap.entrySet()) {
+                if (entry.getValue().getFirst() != null) {
+                    mutations.add(entry.getValue().getFirst());
+                }
+                if (entry.getValue().getSecond() != null) {
+                    mutations.add(entry.getValue().getSecond());
+                }
             }
             rebuildIndexRows(mutations);
-            nextVerificationResult.rebuiltIndexRowCount += mutations.size();
+            nextVerificationResult.rebuiltIndexRowCount += 
dataKeyToMutationMap.size();
             isBeforeRebuilt = false;
         }
 
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == 
IndexTool.IndexVerifyType.BOTH) {
             // We have rebuilt index row and now we need to verify them
-            indexKeyToDataPutMap.clear();
             VerificationResult.PhaseResult verificationPhaseResult = new 
VerificationResult.PhaseResult();
+            indexKeyToMutationMap.clear();
+            for (Map.Entry<byte[], Pair<Put, Delete>> entry: 
dataKeyToMutationMap.entrySet()) {
+                prepareIndexMutations(entry.getValue().getFirst(), 
entry.getValue().getSecond());
+            }
             parallelizeIndexVerify(verificationPhaseResult);
             nextVerificationResult.after.add(verificationPhaseResult);
-            if (mutations.size() != verificationPhaseResult.getTotalCount()) {
-                throw new DoNotRetryIOException(
-                        "mutations.size() != 
verificationPhaseResult.getTotalCount() at the after phase " +
-                                nextVerificationResult + " 
dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
-            }
         }
-        indexKeyToDataPutMap.clear();
         verificationResult.add(nextVerificationResult);
     }
 
+    private boolean isColumnIncluded(Cell cell) {
+        byte[] family = CellUtil.cloneFamily(cell);
+        if (!familyMap.containsKey(family)) {
+            return false;
+        }
+        NavigableSet<byte[]> set = familyMap.get(family);
+        if (set == null || set.isEmpty()) {
+            return true;
+        }
+        byte[] qualifier = CellUtil.cloneQualifier(cell);
+        return set.contains(qualifier);
+    }
+
+    public static long getTimestamp(Mutation m) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                return cell.getTimestamp();
+            }
+        }
+        throw new IllegalStateException("No cell found");
+    }
+
+    /**
+     * This is to reorder the mutations in ascending order by the tuple of 
timestamp and mutation type where
+     * delete comes before put
+     */
+    public static final Comparator<Mutation> MUTATION_TS_COMPARATOR = new 
Comparator<Mutation>() {
+        @Override
+        public int compare(Mutation o1, Mutation o2) {
+            long ts1 = getTimestamp(o1);
+            long ts2 = getTimestamp(o2);
+            if (ts1 < ts2) {
+                return -1;
+            }
+            if (ts1 > ts2) {
+                return 1;
+            }
+            if (o1 instanceof Delete && o2 instanceof Put) {
+                return -1;
+            }
+            if (o1 instanceof Put && o2 instanceof Delete) {
+                return 1;
+            }
+            return 0;
+        }
+    };
+
+    public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) {
+        List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(2);
+        if (put != null) {
+            mutationList.add(put);
+        }
+        if (del != null) {
+            mutationList.add(del);
+        }
+        // Group the cells within a mutation based on their timestamps and 
create a separate mutation for each group
+        mutationList = (List<Mutation>) 
IndexManagementUtil.flattenMutationsByTimestamp(mutationList);
+        // Reorder the mutations on the same row so that delete comes before 
put when they have the same timestamp
+        Collections.sort(mutationList, MUTATION_TS_COMPARATOR);
+        return mutationList;
+    }
+
+    private static Put prepareIndexPutForRebuid(IndexMaintainer 
indexMaintainer, ImmutableBytesPtr rowKeyPtr,
+                                                ValueGetter mergedRowVG, long 
ts)
+            throws IOException {
+        Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                mergedRowVG, rowKeyPtr, ts, null, null);
+        if (indexPut == null) {
+            // No covered column. Just prepare an index row with the empty 
column
+            byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, 
rowKeyPtr,
+                    null, null, HConstants.LATEST_TIMESTAMP);
+            indexPut = new Put(indexRowKey);
+        }
+        
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier(), ts, 
VERIFIED_BYTES);
+        return indexPut;
+    }
+
+    public static void removeColumn(Put put, Cell deleteCell) {
+        byte[] family = CellUtil.cloneFamily(deleteCell);
+        List<Cell> cellList = put.getFamilyCellMap().get(family);
+        if (cellList == null) {
+            return;
+        }
+        Iterator<Cell> cellIterator = cellList.iterator();
+        while (cellIterator.hasNext()) {
+            Cell cell = cellIterator.next();
+            if (CellUtil.matchingQualifier(cell, deleteCell)) {
+                cellIterator.remove();
+                if (cellList.isEmpty()) {
+                    put.getFamilyCellMap().remove(family);
+                }
+                return;
+            }
+        }
+    }
+
+    public static void apply(Put destination, Put source) throws IOException {
+        for (List<Cell> cells : source.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (!destination.has(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell))) {
+                    destination.add(cell);
+                }
+            }
+        }
+    }
+
+    public static Put applyNew(Put destination, Put source) throws IOException 
{
+        Put next = new Put(destination);
+        apply(next, source);
+        return next;
+    }
+
+    /**
+     * Generate the index update for a data row from the mutation that are 
obtained by merging the previous data row
+     * state with the pending row mutation for index rebuild. This method is 
called only for global indexes.
+     * pendingMutations is a sorted list of data table mutations that are used 
to replay index table mutations.
+     * This list is sorted in ascending order by the tuple of row key, 
timestamp and mutation type where delete comes
+     * after put.
+     */
+    public static List<Mutation> 
prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer,
+                                                                 Put dataPut, 
Delete dataDel) throws IOException {
+        List<Mutation> dataMutations = getMutationsWithSameTS(dataPut, 
dataDel);
+        List<Mutation> indexMutations = 
Lists.newArrayListWithExpectedSize(dataMutations.size());
+        // The row key ptr of the data table row for which we will build index 
rows here
+        ImmutableBytesPtr rowKeyPtr = (dataPut != null) ? new 
ImmutableBytesPtr(dataPut.getRow()) :
+                new ImmutableBytesPtr(dataDel.getRow());
+        // Start with empty data table row
+        Put currentDataRow = null;
+        // The index row key corresponding to the current data row
+        byte[] indexRowKeyForCurrentDataRow = null;
+        for (Mutation mutation : dataMutations) {
+            long ts = getTimestamp(mutation);
+            if (mutation instanceof Put) {
+                // Add this put on top of the current data row state to get 
the next data row state
+                Put nextDataRow = (currentDataRow == null) ? new 
Put((Put)mutation) : applyNew((Put)mutation, currentDataRow);
+                ValueGetter nextDataRowVG = new 
IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow);
+                Put indexPut = prepareIndexPutForRebuid(indexMaintainer, 
rowKeyPtr, nextDataRowVG, ts);
+                indexMutations.add(indexPut);
+                // Delete the current index row if the new index key is 
different than the current one
+                if (indexRowKeyForCurrentDataRow != null) {
+                    if (Bytes.compareTo(indexPut.getRow(), 
indexRowKeyForCurrentDataRow) != 0) {
+                        Mutation del = 
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                        indexMutations.add(del);
+                    }
+                }
+                // For the next iteration
+                currentDataRow = nextDataRow;
+                indexRowKeyForCurrentDataRow = indexPut.getRow();
+            } else if (currentDataRow != null) {
+                // We apply delete column mutations only on the current data 
row state. For the index table,
+                // we are only interested in if the current data row is 
deleted or not. There is no need to apply
+                // column deletes to index rows since index rows are always 
full rows and all the cells in an index
+                // row have the same timestamp value. Because of this index 
rows versions do not share cells.
+                for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+                    for (Cell cell : cells) {
+                        switch 
((KeyValue.Type.codeToType(cell.getTypeByte()))) {
+                            case DeleteFamily:
+                            case DeleteFamilyVersion:
+                                
currentDataRow.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                                break;
+                            case DeleteColumn:
+                            case Delete:
+                                removeColumn(currentDataRow, cell);
+                        }
+                    }
+                }
+                if (currentDataRow.getFamilyCellMap().size() == 0) {
+                    Mutation del = 
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                            IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                    indexMutations.add(del);
+                    currentDataRow = null;
+                    indexRowKeyForCurrentDataRow = null;
+                }
+            }
+        }
+        return indexMutations;
+    }
+
+    private void prepareIndexMutations(Put put, Delete del) throws IOException{
+        List<Mutation> indexMutations = 
prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        for (Mutation mutation : indexMutations) {
+            byte[] indexRowKey = mutation.getRow();
+            List<Mutation> mutationList = 
indexKeyToMutationMap.get(indexRowKey);
+            if (mutationList == null) {
+                mutationList = new ArrayList<>();
+                mutationList.add(mutation);
+                indexKeyToMutationMap.put(indexRowKey, mutationList);
+            } else {
+                mutationList.add(mutation);
+            }
+        }
+    }
+
     @Override
     public boolean next(List<Cell> results) throws IOException {
+        if (indexRowKey != null &&
+                singleRowRebuildReturnCode == 
GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue()) {
+            byte[] rowCountBytes =
+                    
PLong.INSTANCE.toBytes(Long.valueOf(singleRowRebuildReturnCode));
+            final Cell aggKeyValue = 
KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+                    SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, 
rowCountBytes.length);
+            results.add(aggKeyValue);
+            return false;
+        }
         Cell lastCell = null;
         int rowCount = 0;
         region.startRegionOperation();
         try {
-            // Partial rebuilds by MetadataRegionObserver use raw scan. Inline 
verification is not supported for them
-            boolean partialRebuild = scan.isRaw();
             byte[] uuidValue = ServerCacheClient.generateId();
             synchronized (innerScanner) {
                 do {
                     List<Cell> row = new ArrayList<Cell>();
                     hasMore = innerScanner.nextRaw(row);
                     if (!row.isEmpty()) {
-                        lastCell = row.get(0);
+                        lastCell = row.get(0); // lastCell is any cell from 
the last visited row
                         Put put = null;
                         Delete del = null;
                         for (Cell cell : row) {
                             if (KeyValue.Type.codeToType(cell.getTypeByte()) 
== KeyValue.Type.Put) {
+                                if (!partialRebuild && familyMap != null && 
!isColumnIncluded(cell)) {
+                                    continue;
+                                }
                                 if (put == null) {
                                     put = new Put(CellUtil.cloneRow(cell));
-                                    mutations.add(put);
                                 }
                                 put.add(cell);
                             } else {
                                 if (del == null) {
                                     del = new Delete(CellUtil.cloneRow(cell));
-                                    mutations.add(del);
                                 }
                                 del.addDeleteMarker(cell);
                             }
                         }
-                        if (partialRebuild) {
+                        if (put == null && del == null) {
+                            continue;
+                        }
+                        // Always add the put first and then delete for a 
given row. This simplifies the logic in
+                        // IndexRegionObserver
+                        if (put != null) {
+                            mutations.add(put);
+                        }
+                        if (del != null) {
+                            mutations.add(del);
+                        }
+                        if (!verify) {
                             if (put != null) {
                                 setMutationAttributes(put, uuidValue);
                             }
@@ -1002,35 +1465,18 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
                                 setMutationAttributes(del, uuidValue);
                             }
                             uuidValue = commitIfReady(uuidValue, mutations);
-                        }
-                        if (indexRowKey != null) {
-                            if (put != null) {
-                                setMutationAttributes(put, uuidValue);
-                            }
-                            Delete deleteMarkers = generateDeleteMarkers(put);
-                            if (deleteMarkers != null) {
-                                setMutationAttributes(deleteMarkers, 
uuidValue);
-                                mutations.add(deleteMarkers);
-                                uuidValue = commitIfReady(uuidValue, 
mutations);
-                            }
-                            // GlobalIndexChecker passed the index row key. 
This is to build a single index row.
-                            // Check if the data table row we have just 
scanned matches with the index row key.
-                            // If not, there is no need to build the index row 
from this data table row,
-                            // and just return zero row count.
-                            if (checkIndexRow(indexRowKey, put)) {
-                                rowCount = 
GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
-                            } else {
-                                rowCount = 
GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
-                            }
-                            break;
+                        } else {
+                            byte[] dataKey = (put != null) ? put.getRow() : 
del.getRow();
+                            prepareIndexMutations(put, del);
+                            dataKeyToMutationMap.put(dataKey, new Pair<Put, 
Delete>(put, del));
                         }
                         rowCount++;
                     }
                 } while (hasMore && rowCount < pageSizeInRows);
-                if (!partialRebuild && indexRowKey == null) {
-                    verifyAndOrRebuildIndex();
-                } else {
-                    if (!mutations.isEmpty()) {
+                if (!mutations.isEmpty()) {
+                    if (verify) {
+                        verifyAndOrRebuildIndex();
+                    } else {
                         
ungroupedAggregateRegionObserver.checkForRegionClosing();
                         
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, 
blockingMemstoreSize);
                     }
@@ -1043,10 +1489,13 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
             region.closeRegionOperation();
             mutations.clear();
             if (verify) {
-              indexKeyToDataPutMap.clear();
-              dataKeyToDataPutMap.clear();
+              dataKeyToMutationMap.clear();
+              indexKeyToMutationMap.clear();
             }
         }
+        if (indexRowKey != null) {
+            rowCount = singleRowRebuildReturnCode;
+        }
         byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
         final Cell aggKeyValue;
         if (lastCell == null) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 4f21511..fa42ce9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1060,9 +1060,20 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
 
     private RegionScanner rebuildIndices(final RegionScanner innerScanner, 
final Region region, final Scan scan,
                                          final RegionCoprocessorEnvironment 
env) throws IOException {
-
-        RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, 
region, scan, env, this);
-        return scanner;
+        if (!scan.isRaw() && 
scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY) == null) {
+            Scan rawScan = new Scan(scan);
+            rawScan.setRaw(true);
+            rawScan.setMaxVersions();
+            rawScan.getFamilyMap().clear();
+            rawScan.setFilter(null);
+            for (byte[] family : scan.getFamilyMap().keySet()) {
+                rawScan.addFamily(family);
+            }
+            innerScanner.close();
+            RegionScanner scanner = region.getScanner(rawScan);
+            return new IndexRebuildRegionScanner(scanner, region, scan, env, 
this);
+        }
+        return new IndexRebuildRegionScanner(innerScanner, region, scan, env, 
this);
     }
     
     private RegionScanner collectStats(final RegionScanner innerScanner, 
StatisticsCollector stats,
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 08a120e..0092c5c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,25 +42,31 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
+import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.LockManager.RowLock;
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
 import org.apache.phoenix.hbase.index.builder.IndexBuilder;
@@ -67,20 +74,27 @@ import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
-import com.google.common.collect.Lists;
+import java.util.Set;
+
+import static 
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
+import static 
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.prepareIndexMutationsForRebuild;
+import static 
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
 
 /**
  * Do all the work of managing index updates from a single coprocessor. All 
Puts/Delets are passed
@@ -155,14 +169,16 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
       private List<RowLock> rowLocks = 
Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
       private boolean rebuild;
+      // The current and next states of the data rows corresponding to the 
pending mutations
+      private HashMap<ImmutableBytesPtr, Pair<Put, Put>> dataRowStates;
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
       }
   }
-  
+
   private ThreadLocal<BatchMutateContext> batchMutateContext =
           new ThreadLocal<BatchMutateContext>();
-  
+
   /** Configuration key for the {@link IndexBuilder} to use */
   public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
 
@@ -172,17 +188,11 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
    */
   public static final String CHECK_VERSION_CONF_KEY = 
"com.saleforce.hbase.index.checkversion";
 
-  private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = 
"org.apache.hadoop.hbase.index.recovery.failurepolicy";
-
   public static final String INDEX_LAZY_POST_BATCH_WRITE = 
"org.apache.hadoop.hbase.index.lazy.post_batch.write";
   private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false;
 
   private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = 
"phoenix.indexer.slow.post.batch.mutate.threshold";
   private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
-  private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = 
"phoenix.indexer.slow.pre.batch.mutate.threshold";
-  private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 
3_000;
-  private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = 
"phoenix.indexer.slow.open.threshold";
-  private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000;
   private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = 
"phoenix.indexer.slow.pre.increment";
   private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 
3_000;
 
@@ -343,13 +353,9 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
 
   public static long getMaxTimestamp(Mutation m) {
       long maxTs = 0;
-      long ts = 0;
-      Iterator iterator = m.getFamilyCellMap().entrySet().iterator();
-      while (iterator.hasNext()) {
-          Map.Entry<byte[], List<Cell>> entry = (Map.Entry) iterator.next();
-          Iterator<Cell> cellIterator = entry.getValue().iterator();
-          while (cellIterator.hasNext()) {
-              Cell cell = cellIterator.next();
+      long ts;
+      for (List<Cell> cells : m.getFamilyCellMap().values()) {
+          for (Cell cell : cells) {
               ts = cell.getTimestamp();
               if (ts > maxTs) {
                   maxTs = ts;
@@ -403,316 +409,541 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
       }
   }
 
-  private Collection<? extends Mutation> 
groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
-                                                        long now, ReplayWrite 
replayWrite) throws IOException {
-      Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
-      boolean copyMutations = false;
-      for (int i = 0; i < miniBatchOp.size(); i++) {
-          if (miniBatchOp.getOperationStatus(i) == IGNORE) {
-              continue;
-          }
-          Mutation m = miniBatchOp.getOperation(i);
-          if (this.builder.isEnabled(m)) {
-              // Track whether or not we need to
-              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-              if (mutationsMap.containsKey(row)) {
-                  copyMutations = true;
-              } else {
-                  mutationsMap.put(row, null);
-              }
-          }
-      }
-      // early exit if it turns out we don't have any edits
-      if (mutationsMap.isEmpty()) {
-          return null;
-      }
-      // If we're copying the mutations
-      Collection<Mutation> originalMutations;
-      Collection<? extends Mutation> mutations;
-      if (copyMutations) {
-          originalMutations = null;
-          mutations = mutationsMap.values();
-      } else {
-          originalMutations = 
Lists.newArrayListWithExpectedSize(mutationsMap.size());
-          mutations = originalMutations;
-      }
-
-      boolean resetTimeStamp = replayWrite == null;
+    /**
+     * This method is only used for local indexes
+     */
+    private Collection<? extends Mutation> 
groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                          long now) throws 
IOException {
+        Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
+        boolean copyMutations = false;
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            if (this.builder.isEnabled(m)) {
+                // Track whether or not we need to
+                ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+                if (mutationsMap.containsKey(row)) {
+                    copyMutations = true;
+                } else {
+                    mutationsMap.put(row, null);
+                }
+            }
+        }
+        // early exit if it turns out we don't have any edits
+        if (mutationsMap.isEmpty()) {
+            return null;
+        }
+        // If we're copying the mutations
+        Collection<Mutation> originalMutations;
+        Collection<? extends Mutation> mutations;
+        if (copyMutations) {
+            originalMutations = null;
+            mutations = mutationsMap.values();
+        } else {
+            originalMutations = 
Lists.newArrayListWithExpectedSize(mutationsMap.size());
+            mutations = originalMutations;
+        }
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            Mutation m = miniBatchOp.getOperation(i);
+            // skip this mutation if we aren't enabling indexing
+            // unfortunately, we really should ask if the raw mutation (rather 
than the combined mutation)
+            // should be indexed, which means we need to expose another method 
on the builder. Such is the
+            // way optimization go though.
+            if (miniBatchOp.getOperationStatus(i) != IGNORE && 
this.builder.isEnabled(m)) {
+                // Unless we're replaying edits to rebuild the index, we 
update the time stamp
+                // of the data table to prevent overlapping time stamps (which 
prevents index
+                // inconsistencies as this case isn't handled correctly 
currently).
+                for (List<Cell> cells : m.getFamilyCellMap().values()) {
+                    for (Cell cell : cells) {
+                        CellUtil.setTimestamp(cell, now);
+                    }
+                }
+                // Only copy mutations if we found duplicate rows
+                // which only occurs when we're partially rebuilding
+                // the index (since we'll potentially have both a
+                // Put and a Delete mutation for the same row).
+                if (copyMutations) {
+                    // Add the mutation to the batch set
+                    ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+                    MultiMutation stored = mutationsMap.get(row);
+                    // we haven't seen this row before, so add it
+                    if (stored == null) {
+                        stored = new MultiMutation(row);
+                        mutationsMap.put(row, stored);
+                    }
+                    stored.addAll(m);
+                } else {
+                    originalMutations.add(m);
+                }
+            }
+        }
+        if (copyMutations) {
+            mutations = 
IndexManagementUtil.flattenMutationsByTimestamp(mutations);
+        }
+        return mutations;
+    }
 
-      for (int i = 0; i < miniBatchOp.size(); i++) {
-          Mutation m = miniBatchOp.getOperation(i);
-          // skip this mutation if we aren't enabling indexing
-          // unfortunately, we really should ask if the raw mutation (rather 
than the combined mutation)
-          // should be indexed, which means we need to expose another method 
on the builder. Such is the
-          // way optimization go though.
-          if (miniBatchOp.getOperationStatus(i) != IGNORE && 
this.builder.isEnabled(m)) {
-              if (resetTimeStamp) {
-                  // Unless we're replaying edits to rebuild the index, we 
update the time stamp
-                  // of the data table to prevent overlapping time stamps 
(which prevents index
-                  // inconsistencies as this case isn't handled correctly 
currently).
-                  for (List<Cell> cells : m.getFamilyCellMap().values()) {
-                      for (Cell cell : cells) {
-                          CellUtil.setTimestamp(cell, now);
-                      }
-                  }
-              }
-              // No need to write the table mutations when we're rebuilding
-              // the index as they're already written and just being replayed.
-              if (replayWrite == ReplayWrite.INDEX_ONLY
-                      || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) {
-                  miniBatchOp.setOperationStatus(i, NOWRITE);
-              }
+    public static void setTimestamp(Mutation m, long ts) throws IOException {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                CellUtil.setTimestamp(cell, ts);
+            }
+        }
+    }
 
-              // Only copy mutations if we found duplicate rows
-              // which only occurs when we're partially rebuilding
-              // the index (since we'll potentially have both a
-              // Put and a Delete mutation for the same row).
-              if (copyMutations) {
-                  // Add the mutation to the batch set
-
-                  ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-                  MultiMutation stored = mutationsMap.get(row);
-                  // we haven't seen this row before, so add it
-                  if (stored == null) {
-                      stored = new MultiMutation(row);
-                      mutationsMap.put(row, stored);
-                  }
-                  stored.addAll(m);
-              } else {
-                  originalMutations.add(m);
-              }
-          }
-      }
+    /**
+     * This method applies pending delete mutations on the next row states
+     */
+    private void 
applyPendingDeleteMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                             BatchMutateContext context) 
throws IOException {
+        for (Integer i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            if (!this.builder.isEnabled(m)) {
+                continue;
+            }
+            if (!(m instanceof Delete)) {
+                continue;
+            }
+            ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+            Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+            Put nextDataRowState = dataRowState.getSecond();
+            if (nextDataRowState == null) {
+                continue;
+            }
+            for (List<Cell> cells : m.getFamilyCellMap().values()) {
+                for (Cell cell : cells) {
+                    switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
+                        case DeleteFamily:
+                        case DeleteFamilyVersion:
+                            
nextDataRowState.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                            break;
+                        case DeleteColumn:
+                        case Delete:
+                            removeColumn(nextDataRowState, cell);
+                    }
+                }
+            }
+            if (nextDataRowState != null && 
nextDataRowState.getFamilyCellMap().size() == 0) {
+                dataRowState.setSecond(null);
+            }
+        }
+    }
 
-      if (copyMutations || replayWrite != null) {
-          mutations = 
IndexManagementUtil.flattenMutationsByTimestamp(mutations);
-      }
-      return mutations;
-  }
+    /**
+     * This method applies the pending put mutations on the the next row 
states.
+     * Before this method is called, the next row states is set to current row 
states.
+     */
+    private void 
applyPendingPutMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                          BatchMutateContext context, long 
now) throws IOException {
+        for (Integer i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            // skip this mutation if we aren't enabling indexing
+            if (!this.builder.isEnabled(m)) {
+                continue;
+            }
+            // Unless we're replaying edits to rebuild the index, we update 
the time stamp
+            // of the data table to prevent overlapping time stamps (which 
prevents index
+            // inconsistencies as this case isn't handled correctly currently).
+            setTimestamp(m, now);
+            if (m instanceof Put) {
+                ImmutableBytesPtr rowKeyPtr = new 
ImmutableBytesPtr(m.getRow());
+                Pair<Put, Put> dataRowState = 
context.dataRowStates.get(rowKeyPtr);
+                if (dataRowState == null) {
+                    dataRowState = new Pair<Put, Put>(null, null);
+                    context.dataRowStates.put(rowKeyPtr, dataRowState);
+                }
+                Put nextDataRowState = dataRowState.getSecond();
+                dataRowState.setSecond((nextDataRowState != null) ? 
applyNew((Put) m, nextDataRowState) : new Put((Put) m));
+            }
+        }
+    }
 
-  public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] 
emptyCQ) {
-      List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
-      if (cellList == null) {
-          return;
-      }
-      Iterator<Cell> cellIterator = cellList.iterator();
-      while (cellIterator.hasNext()) {
-          Cell cell = cellIterator.next();
-          if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
-                  emptyCQ, 0, emptyCQ.length) == 0) {
-              cellIterator.remove();
-              return;
-          }
-      }
-  }
+    /**
+     * * Prepares data row current and next row states
+     */
+    private void 
prepareDataRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                      MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
+                                      BatchMutateContext context,
+                                      long now) throws IOException {
+        if (context.rowsToLock.size() == 0) {
+            return;
+        }
+        // Retrieve the current row states from the data table
+        getCurrentRowStates(c, context);
+        applyPendingPutMutations(miniBatchOp, context, now);
+        applyPendingDeleteMutations(miniBatchOp, context);
+    }
 
-  private void 
handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
-                                       MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
-                                       ListMultimap<HTableInterfaceReference, 
Pair<Mutation, byte[]>> indexUpdates) {
-      byte[] tableName = 
c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
-      HTableInterfaceReference hTableInterfaceReference =
-                          new HTableInterfaceReference(new 
ImmutableBytesPtr(tableName));
-      List<Pair<Mutation, byte[]>> localIndexUpdates = 
indexUpdates.removeAll(hTableInterfaceReference);
-      if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
-          return;
-      }
-      List<Mutation> localUpdates = new ArrayList<Mutation>();
-      Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = 
localIndexUpdates.iterator();
-      while (indexUpdatesItr.hasNext()) {
-          Pair<Mutation, byte[]> next = indexUpdatesItr.next();
-          localUpdates.add(next.getFirst());
-      }
-      if (!localUpdates.isEmpty()) {
-          miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new 
Mutation[localUpdates.size()]));
-      }
-  }
+    public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] 
emptyCQ) {
+        List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
+        if (cellList == null) {
+            return;
+        }
+        Iterator<Cell> cellIterator = cellList.iterator();
+        while (cellIterator.hasNext()) {
+            Cell cell = cellIterator.next();
+            if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
+                    emptyCQ, 0, emptyCQ.length) == 0) {
+                cellIterator.remove();
+                return;
+            }
+        }
+    }
 
-  private void prepareIndexMutations(
-          ObserverContext<RegionCoprocessorEnvironment> c,
-          MiniBatchOperationInProgress<Mutation> miniBatchOp,
-          BatchMutateContext context,
-          Collection<? extends Mutation> mutations,
-          long now,
-          PhoenixIndexMetaData indexMetaData) throws Throwable {
-      List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-      // get the current span, or just use a null-span to avoid a bunch of if 
statements
-      try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
-          Span current = scope.getSpan();
-          if (current == null) {
-              current = NullSpan.INSTANCE;
-          }
-          // get the index updates for all elements in this batch
-          context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, 
Pair<Mutation, byte[]>>create();
-          this.builder.getIndexUpdates(context.indexUpdates, miniBatchOp, 
mutations, indexMetaData);
-          current.addTimelineAnnotation("Built index updates, doing preStep");
-          handleLocalIndexUpdates(c, miniBatchOp, context.indexUpdates);
-          context.preIndexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
-          int updateCount = 0;
-          for (IndexMaintainer indexMaintainer : maintainers) {
-              updateCount++;
-              byte[] emptyCF = 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-              byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
-              HTableInterfaceReference hTableInterfaceReference =
-                      new HTableInterfaceReference(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-              List <Pair<Mutation, byte[]>> updates = 
context.indexUpdates.get(hTableInterfaceReference);
-              for (Pair<Mutation, byte[]> update : updates) {
-                  // add the VERIFIED cell, which is the empty cell
-                  Mutation m = update.getFirst();
-                  if (context.rebuild) {
-                      if (m instanceof Put) {
-                          long ts = getMaxTimestamp(m);
-                          // Remove the empty column prepared by Index codec 
as we need to change its value
-                          removeEmptyColumn(m, emptyCF, emptyCQ);
-                          ((Put) m).addColumn(emptyCF, emptyCQ, ts, 
VERIFIED_BYTES);
-                      }
-                      context.preIndexUpdates.put(hTableInterfaceReference, m);
-                  } else {
-                      if (m instanceof Put) {
-                          // Remove the empty column prepared by Index codec 
as we need to change its value
-                          removeEmptyColumn(m, emptyCF, emptyCQ);
-                          // Set the status of the index row to "unverified"
-                          ((Put) m).addColumn(emptyCF, emptyCQ, now, 
UNVERIFIED_BYTES);
-                          // This will be done before the data table row is 
updated (i.e., in the first write phase)
-                          
context.preIndexUpdates.put(hTableInterfaceReference, m);
-                      }
-                      else {
-                          // Set the status of the index row to "unverified"
-                          Put unverifiedPut = new Put(m.getRow());
-                          unverifiedPut.addColumn(emptyCF, emptyCQ, now, 
UNVERIFIED_BYTES);
-                          // This will be done before the data table row is 
updated (i.e., in the first write phase)
-                          
context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
-                      }
-                  }
-              }
-          }
-          TracingUtils.addAnnotation(current, "index update count", 
updateCount);
-      }
-  }
+    /**
+     * The index update generation for local indexes uses the existing index 
update generation code (i.e.,
+     * the {@link IndexBuilder} implementation).
+     */
+    private void 
handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                         
MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                         Collection<? extends Mutation> 
pendingMutations,
+                                         PhoenixIndexMetaData indexMetaData) 
throws Throwable {
+        ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> 
indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, 
byte[]>>create();
+        this.builder.getIndexUpdates(indexUpdates, miniBatchOp, 
pendingMutations, indexMetaData);
+        byte[] tableName = 
c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+        HTableInterfaceReference hTableInterfaceReference =
+                new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
+        List<Pair<Mutation, byte[]>> localIndexUpdates = 
indexUpdates.removeAll(hTableInterfaceReference);
+        if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
+            return;
+        }
+        List<Mutation> localUpdates = new ArrayList<Mutation>();
+        Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = 
localIndexUpdates.iterator();
+        while (indexUpdatesItr.hasNext()) {
+            Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+            localUpdates.add(next.getFirst());
+        }
+        if (!localUpdates.isEmpty()) {
+            miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new 
Mutation[localUpdates.size()]));
+        }
+    }
+    /**
+     * Retrieve the the last committed data row state. This method is called 
only for regular data mutations since for
+     * rebuild (i.e., index replay) mutations include all row versions.
+     */
+    private void 
getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                     BatchMutateContext context) throws 
IOException {
+        Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
+        for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+        }
+        Scan scan = new Scan();
+        ScanRanges scanRanges = ScanRanges.createPointLookup(new 
ArrayList<KeyRange>(keys));
+        scanRanges.initializeScan(scan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        scan.setFilter(skipScanFilter);
+        context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, 
Put>>(context.rowsToLock.size());
+        try (RegionScanner scanner = 
c.getEnvironment().getRegion().getScanner(scan)) {
+            boolean more = true;
+            while(more) {
+                List<Cell> cells = new ArrayList<Cell>();
+                more = scanner.next(cells);
+                if (cells.isEmpty()) {
+                    continue;
+                }
+                byte[] rowKey = CellUtil.cloneRow(cells.get(0));
+                Put put = new Put(rowKey);
+                for (Cell cell : cells) {
+                    put.add(cell);
+                }
+                context.dataRowStates.put(new ImmutableBytesPtr(rowKey), new 
Pair<Put, Put>(put, new Put(put)));
+            }
+        }
+    }
 
-  protected PhoenixIndexMetaData getPhoenixIndexMetaData(
-          ObserverContext<RegionCoprocessorEnvironment> observerContext,
-          MiniBatchOperationInProgress<Mutation> miniBatchOp) throws 
IOException {
-      IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
-      if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
-          throw new DoNotRetryIOException(
-                  "preBatchMutateWithExceptions: indexMetaData is not an 
instance of "+PhoenixIndexMetaData.class.getName() +
-                          ", current table is:" +
-                          
observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-      }
-      return (PhoenixIndexMetaData)indexMetaData;
-  }
+    /**
+     * Generate the index update for a data row from the mutation that are 
obtained by merging the previous data row
+     * state with the pending row mutation.
+     */
+    private void prepareIndexMutations(BatchMutateContext context, 
List<IndexMaintainer> maintainers, long ts)
+            throws IOException {
+        List<Pair<IndexMaintainer, HTableInterfaceReference>> indexTables = 
new ArrayList<>(maintainers.size());
+        for (IndexMaintainer indexMaintainer : maintainers) {
+            if (indexMaintainer.isLocalIndex()) {
+                continue;
+            }
+            HTableInterfaceReference hTableInterfaceReference =
+                    new HTableInterfaceReference(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+            indexTables.add(new Pair<>(indexMaintainer, 
hTableInterfaceReference));
+        }
+        for (Map.Entry<ImmutableBytesPtr, Pair<Put, Put>> entry : 
context.dataRowStates.entrySet()) {
+            ImmutableBytesPtr rowKeyPtr = entry.getKey();
+            Pair<Put, Put> dataRowState =  entry.getValue();
+            Put currentDataRowState = dataRowState.getFirst();
+            Put nextDataRowState = dataRowState.getSecond();
+            if (currentDataRowState == null && nextDataRowState == null) {
+                continue;
+            }
+            for (Pair<IndexMaintainer, HTableInterfaceReference> pair : 
indexTables) {
+                IndexMaintainer indexMaintainer = pair.getFirst();
+                HTableInterfaceReference hTableInterfaceReference = 
pair.getSecond();
+                if (nextDataRowState != null) {
+                    ValueGetter nextDataRowVG = new 
IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+                    Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                            nextDataRowVG, rowKeyPtr, ts, null, null);
+                    if (indexPut == null) {
+                        // No covered column. Just prepare an index row with 
the empty column
+                        byte[] indexRowKey = 
indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr,
+                                null, null, HConstants.LATEST_TIMESTAMP);
+                        indexPut = new Put(indexRowKey);
+                    }
+                    
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                            indexMaintainer.getEmptyKeyValueQualifier(), ts, 
UNVERIFIED_BYTES);
+                    context.indexUpdates.put(hTableInterfaceReference,
+                            new Pair<Mutation, byte[]>(indexPut, 
rowKeyPtr.get()));
+                    // Delete the current index row if the new index key is 
different than the current one
+                    if (currentDataRowState != null) {
+                        ValueGetter currentDataRowVG = new 
IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+                        byte[] indexRowKeyForCurrentDataRow = 
indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
+                                null, null, HConstants.LATEST_TIMESTAMP);
+                        if (Bytes.compareTo(indexPut.getRow(), 
indexRowKeyForCurrentDataRow) != 0) {
+                            Mutation del = 
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                    IndexMaintainer.DeleteType.ALL_VERSIONS, 
ts);
+                            context.indexUpdates.put(hTableInterfaceReference,
+                                    new Pair<Mutation, byte[]>(del, 
rowKeyPtr.get()));
+                        }
+                    }
+                } else if (currentDataRowState != null) {
+                    ValueGetter currentDataRowVG = new 
IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+                    byte[] indexRowKeyForCurrentDataRow = 
indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
+                            null, null, HConstants.LATEST_TIMESTAMP);
+                    Mutation del = 
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                            IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                    context.indexUpdates.put(hTableInterfaceReference,
+                            new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
+                }
+            }
+        }
+    }
 
-  private void preparePostIndexMutations(
-          BatchMutateContext context,
-          long now,
-          PhoenixIndexMetaData indexMetaData) throws Throwable {
-      context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, 
Mutation>create();
-      List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-      // Check if we need to skip post index update for any of the rows
-      for (IndexMaintainer indexMaintainer : maintainers) {
-          byte[] emptyCF = 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-          byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
-          HTableInterfaceReference hTableInterfaceReference =
-                  new HTableInterfaceReference(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-          List <Pair<Mutation, byte[]>> updates = 
context.indexUpdates.get(hTableInterfaceReference);
-          for (Pair<Mutation, byte[]> update : updates) {
-              // Are there concurrent updates on the data table row? if so, 
skip post index updates
-              // and let read repair resolve conflicts
-              ImmutableBytesPtr rowKey = new 
ImmutableBytesPtr(update.getSecond());
-              PendingRow pendingRow = pendingRows.get(rowKey);
-              if (!pendingRow.isConcurrent()) {
-                  Mutation m = update.getFirst();
-                  if (m instanceof Put) {
-                      Put verifiedPut = new Put(m.getRow());
-                      // Set the status of the index row to "verified"
-                      verifiedPut.addColumn(emptyCF, emptyCQ, now, 
VERIFIED_BYTES);
-                      context.postIndexUpdates.put(hTableInterfaceReference, 
verifiedPut);
-                  }
-                  else {
-                      context.postIndexUpdates.put(hTableInterfaceReference, 
m);
-                  }
+    /**
+     * This method prepares unverified index mutations which are applied to 
index tables before the data table is
+     * updated. In the three phase update approach, in phase 1, the status of 
existing index rows is set to "unverified"
+     * (these rows will be deleted from the index table in phase 3), and/or 
new put mutations are added with the
+     * unverified status. In phase 2, data table mutations are applied. In 
phase 3, the status for an index table row is
+     * either set to "verified" or the row is deleted.
+     */
+    private void 
preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+                                          
MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                          BatchMutateContext context,
+                                          Collection<? extends Mutation> 
pendingMutations,
+                                          long now,
+                                          PhoenixIndexMetaData indexMetaData) 
throws Throwable {
+        List<IndexMaintainer> maintainers = 
indexMetaData.getIndexMaintainers();
+        // get the current span, or just use a null-span to avoid a bunch of 
if statements
+        try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
+            Span current = scope.getSpan();
+            if (current == null) {
+                current = NullSpan.INSTANCE;
+            }
+            current.addTimelineAnnotation("Built index updates, doing 
preStep");
+            // Handle local index updates
+            for (IndexMaintainer indexMaintainer : maintainers) {
+                if (indexMaintainer.isLocalIndex()) {
+                    handleLocalIndexUpdates(c, miniBatchOp, pendingMutations, 
indexMetaData);
+                    break;
+                }
+            }
+            // The rest of this method is for handling global index updates
+            context.indexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+            prepareIndexMutations(context, maintainers, now);
+
+            context.preIndexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+            int updateCount = 0;
+            for (IndexMaintainer indexMaintainer : maintainers) {
+                updateCount++;
+                byte[] emptyCF = 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+                byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+                HTableInterfaceReference hTableInterfaceReference =
+                        new HTableInterfaceReference(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+                List <Pair<Mutation, byte[]>> updates = 
context.indexUpdates.get(hTableInterfaceReference);
+                for (Pair<Mutation, byte[]> update : updates) {
+                    Mutation m = update.getFirst();
+                    if (m instanceof Put) {
+                        // This will be done before the data table row is 
updated (i.e., in the first write phase)
+                        context.preIndexUpdates.put(hTableInterfaceReference, 
m);
+                    } else {
+                        // Set the status of the index row to "unverified"
+                        Put unverifiedPut = new Put(m.getRow());
+                        unverifiedPut.addColumn(emptyCF, emptyCQ, now, 
UNVERIFIED_BYTES);
+                        // This will be done before the data table row is 
updated (i.e., in the first write phase)
+                        context.preIndexUpdates.put(hTableInterfaceReference, 
unverifiedPut);
+                    }
+                }
+            }
+            TracingUtils.addAnnotation(current, "index update count", 
updateCount);
+        }
+    }
 
-              }
-          }
-      }
-      // We are done with handling concurrent mutations. So we can remove the 
rows of this batch from
-      // the collection of pending rows
-      removePendingRows(context);
-      context.indexUpdates.clear();
-  }
+    protected PhoenixIndexMetaData 
getPhoenixIndexMetaData(ObserverContext<RegionCoprocessorEnvironment> 
observerContext,
+                                                           
MiniBatchOperationInProgress<Mutation> miniBatchOp)
+            throws IOException {
+        IndexMetaData indexMetaData = 
this.builder.getIndexMetaData(miniBatchOp);
+        if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
+            throw new DoNotRetryIOException(
+                    "preBatchMutateWithExceptions: indexMetaData is not an 
instance of "+PhoenixIndexMetaData.class.getName() +
+                            ", current table is:" +
+                            
observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+        }
+        return (PhoenixIndexMetaData)indexMetaData;
+    }
 
-  public void 
preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
-          MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable 
{
-      ignoreAtomicOperations(miniBatchOp);
-      PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, 
miniBatchOp);
-      BatchMutateContext context = new 
BatchMutateContext(indexMetaData.getClientVersion());
-      setBatchMutateContext(c, context);
-      Mutation firstMutation = miniBatchOp.getOperation(0);
-      ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
-      context.rebuild = replayWrite != null;
-      /*
-       * Exclusively lock all rows so we get a consistent read
-       * while determining the index updates
-       */
-      long now;
-      if (!context.rebuild) {
-          populateRowsToLock(miniBatchOp, context);
-          lockRows(context);
-          now = EnvironmentEdgeManager.currentTimeMillis();
-          // Add the table rows in the mini batch to the collection of pending 
rows. This will be used to detect
-          // concurrent updates
-          populatePendingRows(context);
-      }
-      else {
-          now = EnvironmentEdgeManager.currentTimeMillis();
-      }
-      // First group all the updates for a single row into a single update to 
be processed
-      Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, 
now, replayWrite);
-      // early exit if it turns out we don't have any edits
-      if (mutations == null) {
-          return;
-      }
+    private void preparePostIndexMutations(BatchMutateContext context, long 
now, PhoenixIndexMetaData indexMetaData)
+            throws Throwable {
+        context.postIndexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+        List<IndexMaintainer> maintainers = 
indexMetaData.getIndexMaintainers();
+        // Check if we need to skip post index update for any of the rows
+        for (IndexMaintainer indexMaintainer : maintainers) {
+            byte[] emptyCF = 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+            HTableInterfaceReference hTableInterfaceReference =
+                    new HTableInterfaceReference(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+            List <Pair<Mutation, byte[]>> updates = 
context.indexUpdates.get(hTableInterfaceReference);
+            for (Pair<Mutation, byte[]> update : updates) {
+                // Are there concurrent updates on the data table row? if so, 
skip post index updates
+                // and let read repair resolve conflicts
+                ImmutableBytesPtr rowKey = new 
ImmutableBytesPtr(update.getSecond());
+                PendingRow pendingRow = pendingRows.get(rowKey);
+                if (!pendingRow.isConcurrent()) {
+                    Mutation m = update.getFirst();
+                    if (m instanceof Put) {
+                        Put verifiedPut = new Put(m.getRow());
+                        // Set the status of the index row to "verified"
+                        verifiedPut.addColumn(emptyCF, emptyCQ, now, 
VERIFIED_BYTES);
+                        context.postIndexUpdates.put(hTableInterfaceReference, 
verifiedPut);
+                    }
+                    else {
+                        context.postIndexUpdates.put(hTableInterfaceReference, 
m);
+                    }
+                }
+            }
+        }
+        // We are done with handling concurrent mutations. So we can remove 
the rows of this batch from
+        // the collection of pending rows
+        removePendingRows(context);
+        context.indexUpdates.clear();
+    }
 
-      long start = EnvironmentEdgeManager.currentTimeMillis();
-      prepareIndexMutations(c, miniBatchOp, context, mutations, now, 
indexMetaData);
-      
metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() 
- start);
-
-      // Sleep for one millisecond if we have prepared the index updates in 
less than 1 ms. The sleep is necessary to
-      // get different timestamps for concurrent batches that share common 
rows. It is very rare that the index updates
-      // can be prepared in less than one millisecond
-      if (!context.rowLocks.isEmpty() && now == 
EnvironmentEdgeManager.currentTimeMillis()) {
-          Thread.sleep(1);
-          LOG.debug("slept 1ms for " + 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-      }
-      // Release the locks before making RPC calls for index updates
-      for (RowLock rowLock : context.rowLocks) {
-          rowLock.release();
-      }
-      // Do the first phase index updates
-      doPre(c, context, miniBatchOp);
-      if (!context.rebuild) {
-          // Acquire the locks again before letting the region proceed with 
data table updates
-          List<RowLock> rowLocks = 
Lists.newArrayListWithExpectedSize(context.rowLocks.size());
-          for (RowLock rowLock : context.rowLocks) {
-              rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), 
rowLockWaitDuration));
-          }
-          context.rowLocks.clear();
-          context.rowLocks = rowLocks;
-          preparePostIndexMutations(context, now, indexMetaData);
-      }
-      if (failDataTableUpdatesForTesting) {
-          throw new DoNotRetryIOException("Simulating the data table write 
failure");
-      }
-  }
+    /**
+     * There are at most two rebuild mutation for every row, one put and one 
delete. They are listed in indexMutations
+     * next to each other such that put comes before delete by {@link 
IndexRebuildRegionScanner}. This method is called
+     * only for global indexes.
+     */
+    private void 
preBatchMutateWithExceptionsForRebuild(ObserverContext<RegionCoprocessorEnvironment>
 c,
+                                                        
MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                        BatchMutateContext 
context,
+                                                        IndexMaintainer 
indexMaintainer) throws Throwable {
+        Put put = null;
+        List <Mutation> indexMutations = new ArrayList<>();
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            if (!this.builder.isEnabled(m)) {
+                continue;
+            }
+            if (m instanceof Put) {
+                if (put != null) {
+                    
indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, 
null));
+                }
+                put = (Put)m;
+            } else {
+                
indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, 
(Delete)m));
+                put = null;
+            }
+            miniBatchOp.setOperationStatus(i, NOWRITE);
+        }
+        if (put != null) {
+            
indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, 
null));
+        }
+        HTableInterfaceReference hTableInterfaceReference =
+                new HTableInterfaceReference(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+        context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, 
Mutation>create();
+        for (Mutation m : indexMutations) {
+            context.preIndexUpdates.put(hTableInterfaceReference, m);
+        }
+        doPre(c, context, miniBatchOp);
+        // For rebuild updates, no post index update is prepared. Just create 
an empty list.
+        context.postIndexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+    }
 
+    public void 
preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
+                                             
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
+        ignoreAtomicOperations(miniBatchOp);
+        PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, 
miniBatchOp);
+        BatchMutateContext context = new 
BatchMutateContext(indexMetaData.getClientVersion());
+        setBatchMutateContext(c, context);
+        Mutation firstMutation = miniBatchOp.getOperation(0);
+        ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+        context.rebuild = replayWrite != null;
+        if (context.rebuild) {
+            preBatchMutateWithExceptionsForRebuild(c, miniBatchOp, context, 
indexMetaData.getIndexMaintainers().get(0));
+            return;
+        }
+        /*
+         * Exclusively lock all rows so we get a consistent read
+         * while determining the index updates
+         */
+        long now;
+        populateRowsToLock(miniBatchOp, context);
+        lockRows(context);
+        now = EnvironmentEdgeManager.currentTimeMillis();
+        // Add the table rows in the mini batch to the collection of pending 
rows. This will be used to detect
+        // concurrent updates
+        populatePendingRows(context);
+        // Prepare current and next data rows states for pending mutations 
(for global indexes)
+        prepareDataRowStates(c, miniBatchOp, context, now);
+        // Group all the updates for a single row into a single update to be 
processed (for local indexes)
+        Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, 
now);
+        // early exit if it turns out we don't have any edits
+        if (mutations == null || mutations.isEmpty()) {
+            return;
+        }
+        long start = EnvironmentEdgeManager.currentTimeMillis();
+        preparePreIndexMutations(c, miniBatchOp, context, mutations, now, 
indexMetaData);
+        
metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() 
- start);
+        // Sleep for one millisecond if we have prepared the index updates in 
less than 1 ms. The sleep is necessary to
+        // get different timestamps for concurrent batches that share common 
rows. It is very rare that the index updates
+        // can be prepared in less than one millisecond
+        if (!context.rowLocks.isEmpty() && now == 
EnvironmentEdgeManager.currentTimeMillis()) {
+            Thread.sleep(1);
+            LOG.debug("slept 1ms for " + 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+        }
+        // Release the locks before making RPC calls for index updates
+        for (RowLock rowLock : context.rowLocks) {
+            rowLock.release();
+        }
+        // Do the first phase index updates
+        doPre(c, context, miniBatchOp);
+        // Acquire the locks again before letting the region proceed with data 
table updates
+        List<RowLock> rowLocks = 
Lists.newArrayListWithExpectedSize(context.rowLocks.size());
+        for (RowLock rowLock : context.rowLocks) {
+            rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), 
rowLockWaitDuration));
+        }
+        context.rowLocks.clear();
+        context.rowLocks = rowLocks;
+        preparePostIndexMutations(context, now, indexMetaData);
+        if (failDataTableUpdatesForTesting) {
+            throw new DoNotRetryIOException("Simulating the data table write 
failure");
+        }
+    }
   private void 
setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context) {
       this.batchMutateContext.set(context);
   }
-  
+
   private BatchMutateContext 
getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
       return this.batchMutateContext.get();
   }
-  
+
   private void 
removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
       this.batchMutateContext.remove();
   }
@@ -822,14 +1053,6 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
   }
 
   /**
-   * Exposed for testing!
-   * @return the currently instantiated index builder
-   */
-  public IndexBuilder getBuilderForTesting() {
-    return this.builder.getBuilderForTesting();
-  }
-
-  /**
    * Enable indexing on the given table
    * @param desc {@link HTableDescriptor} for the table on which indexing 
should be enabled
    * @param builder class to use when building the index for this table
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index dba165b..3723388 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1593,7 +1593,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      */
     private void initCachedState() {
         byte[] emptyKvQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst();
-        dataEmptyKeyValueRef = new 
ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier);
+        dataEmptyKeyValueRef = new ColumnReference(dataEmptyKeyValueCF, 
emptyKvQualifier);
         this.allColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + 
coveredColumnsMap.size());
         // columns that are required to evaluate all expressions in 
indexedExpressions (not including columns in data row key)
         this.indexedColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());

Reply via email to