This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 544ef96  PHOENIX-5969 Read repair reduces the number of rows returned 
for LIMIT queries
544ef96 is described below

commit 544ef964f3e5256d87025f93cbf3ad14457639ab
Author: Kadir <kozde...@salesforce.com>
AuthorDate: Sat Jun 20 07:31:38 2020 -0700

    PHOENIX-5969 Read repair reduces the number of rows returned for LIMIT 
queries
---
 .../end2end/index/GlobalIndexCheckerIT.java        |  55 +++++++++++
 .../apache/phoenix/index/GlobalIndexChecker.java   | 105 +++++++++++++++------
 .../java/org/apache/phoenix/util/IndexUtil.java    |  71 ++++++++++----
 3 files changed, 183 insertions(+), 48 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 5c358c7..07166a0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -225,6 +225,61 @@ public class GlobalIndexCheckerIT extends 
BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
+    public void testLimitWithUnverifiedRows() throws Exception {
+        if (async) {
+            return;
+        }
+        String dataTableName = generateUniqueName();
+        populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 
'abcd') and ('b', 'bc', 'bcd', 'bcde')
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + 
" on " +
+                    dataTableName + " (val1) include (val2, val3)");
+            conn.commit();
+            // Read all index rows and rewrite them back directly. This will 
overwrite existing rows with newer
+            // timestamps and set the empty column to value "x". This will 
make them unverified
+            conn.createStatement().execute("UPSERT INTO " + indexTableName + " 
SELECT * FROM " +
+                    indexTableName);
+            conn.commit();
+            // Verified that read repair will not reduce the number of rows 
returned for LIMIT queries
+            String selectSql = "SELECT * from " + indexTableName + " LIMIT 1";
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("a", rs.getString(2));
+            assertEquals("abc", rs.getString(3));
+            assertEquals("abcd", rs.getString(4));
+            assertFalse(rs.next());
+            selectSql = "SELECT val3 from " + dataTableName + " WHERE val1 = 
'bc' LIMIT 1";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("bcde", rs.getString(1));
+            assertFalse(rs.next());
+            // Configure IndexRegionObserver to fail the last write phase 
(i.e., the post index update phase) where the verify flag is set
+            // to true and/or index rows are deleted and check that this does 
not impact the correctness
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+            // This is to cover the case where there is no data table row for 
an unverified index row
+            conn.createStatement().execute("upsert into " + dataTableName + " 
(id, val1, val2) values ('c', 'aa','cde')");
+            commitWithException(conn);
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            // Verified that read repair will not reduce the number of rows 
returned for LIMIT queries
+            selectSql = "SELECT * from " + indexTableName + " LIMIT 1";
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("a", rs.getString(2));
+            assertEquals("abc", rs.getString(3));
+            assertEquals("abcd", rs.getString(4));
+            assertFalse(rs.next());
+            // Add rows and check everything is still okay
+            verifyTableHealth(conn, dataTableName, indexTableName);
+
+        }
+    }
+
+    @Test
     public void testSimulateConcurrentUpdates() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String dataTableName = generateUniqueName();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index e2b6f0a..2d7fd9f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -27,7 +27,6 @@ import static 
org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import static org.apache.phoenix.compat.hbase.CompatUtil.*;
 
-
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -49,6 +48,9 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -134,7 +136,10 @@ public class GlobalIndexChecker extends BaseRegionObserver 
{
         private long minTimestamp;
         private long maxTimestamp;
         private GlobalIndexCheckerSource metricsSource;
-
+        private long rowCount = 0;
+        private long pageSize = Long.MAX_VALUE;
+        private boolean restartScanDueToPageFilterRemoval = false;
+        private boolean hasMore;
         public GlobalIndexScanner(RegionCoprocessorEnvironment env,
                                   Scan scan,
                                   RegionScanner scanner,
@@ -173,12 +178,14 @@ public class GlobalIndexChecker extends 
BaseRegionObserver {
             return scanner.getMaxResultSize();
         }
 
-        @Override
-        public boolean next(List<Cell> result) throws IOException {
+        public boolean next(List<Cell> result, boolean raw) throws IOException 
{
             try {
-                boolean hasMore;
                 do {
-                    hasMore = scanner.next(result);
+                    if (raw) {
+                        hasMore = scanner.nextRaw(result);
+                    } else {
+                        hasMore = scanner.next(result);
+                    }
                     if (result.isEmpty()) {
                         break;
                     }
@@ -188,6 +195,10 @@ public class GlobalIndexChecker extends BaseRegionObserver 
{
                     // skip this row as it is invalid
                     // if there is no more row, then result will be an empty 
list
                 } while (hasMore);
+                rowCount++;
+                if (rowCount == pageSize) {
+                    return false;
+                }
                 return hasMore;
             } catch (Throwable t) {
                 
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
@@ -196,6 +207,16 @@ public class GlobalIndexChecker extends BaseRegionObserver 
{
         }
 
         @Override
+        public boolean next(List<Cell> result) throws IOException {
+           return next(result, false);
+        }
+
+        @Override
+        public boolean nextRaw(List<Cell> result) throws IOException {
+            return next(result, true);
+        }
+
+        @Override
         public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
             throw new IOException("next with scannerContext should not be 
called in Phoenix environment");
         }
@@ -233,27 +254,6 @@ public class GlobalIndexChecker extends BaseRegionObserver 
{
             return scanner.getMvccReadPoint();
         }
 
-        @Override
-        public boolean nextRaw(List<Cell> result) throws IOException {
-            try {
-                boolean hasMore;
-                do {
-                    hasMore = scanner.nextRaw(result);
-                    if (result.isEmpty()) {
-                        break;
-                    }
-                    if (verifyRowAndRepairIfNecessary(result)) {
-                        break;
-                    }
-                    // skip this row as it is invalid
-                    // if there is no more row, then result will be an empty 
list
-                } while (hasMore);
-                return hasMore;
-            } catch (Throwable t) {
-                
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
-                return false; // impossible
-            }
-        }
 
         private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, 
long ts, boolean specific) throws IOException {
             if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > 
ageThreshold) {
@@ -270,9 +270,44 @@ public class GlobalIndexChecker extends BaseRegionObserver 
{
             }
         }
 
+        private PageFilter removePageFilterFromFilterList(FilterList 
filterList) {
+            Iterator<Filter> filterIterator = 
filterList.getFilters().iterator();
+            while (filterIterator.hasNext()) {
+                Filter filter = filterIterator.next();
+                if (filter instanceof PageFilter) {
+                    filterIterator.remove();
+                    return (PageFilter) filter;
+                } else if (filter instanceof FilterList) {
+                    PageFilter pageFilter = 
removePageFilterFromFilterList((FilterList) filter);
+                    if (pageFilter != null) {
+                        return pageFilter;
+                    }
+                }
+            }
+            return null;
+        }
+
+        // This method assumes that there is at most one instance of 
PageFilter in a scan
+        private PageFilter removePageFilter(Scan scan) {
+            Filter filter = scan.getFilter();
+            if (filter != null) {
+                if (filter instanceof PageFilter) {
+                    scan.setFilter(null);
+                    return (PageFilter) filter;
+                } else if (filter instanceof FilterList) {
+                    return removePageFilterFromFilterList((FilterList) filter);
+                }
+            }
+            return null;
+        }
+
         private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> 
row) throws IOException {
-            // Build the data table row key from the index table row key
             if (buildIndexScan == null) {
+                PageFilter pageFilter = removePageFilter(scan);
+                if (pageFilter != null) {
+                    pageSize = pageFilter.getPageSize();
+                    restartScanDueToPageFilterRemoval = true;
+                }
                 buildIndexScan = new Scan();
                 indexScan = new Scan(scan);
                 deleteRowScan = new Scan();
@@ -318,10 +353,20 @@ public class GlobalIndexChecker extends 
BaseRegionObserver {
                 // Skip this unverified row (i.e., do not return it to the 
client). Just retuning empty row is
                 // sufficient to do that
                 row.clear();
+                if (restartScanDueToPageFilterRemoval) {
+                    scanner.close();
+                    setStartRow(indexScan, indexRowKey, false);
+                    scanner = region.getScanner(indexScan);
+                    hasMore = true;
+                    // Set restartScanDueToPageFilterRemoval to false as we do 
not restart the scan unnecessarily next time
+                    restartScanDueToPageFilterRemoval = false;
+                }
                 return;
             }
             // An index row has been built. Close the current scanner as the 
newly built row will not be visible to it
             scanner.close();
+            // Set restartScanDueToPageFilterRemoval to false as we do not 
restart the scan unnecessarily next time
+            restartScanDueToPageFilterRemoval = false;
             if (code == RebuildReturnCode.NO_INDEX_ROW.getValue()) {
                 // This means there exists a data table row for the data row 
key derived from this unverified index row
                 // but the data table row does not point back to the index row.
@@ -330,6 +375,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                 // Open a new scanner starting from the row after the current 
row
                 setStartRow(indexScan, indexRowKey, false);
                 scanner = region.getScanner(indexScan);
+                hasMore = true;
                 // Skip this unverified row (i.e., do not return it to the 
client). Just retuning empty row is
                 // sufficient to do that
                 row.clear();
@@ -339,7 +385,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             // Open a new scanner starting from the current row
             setStartRow(indexScan, indexRowKey, true);
             scanner = region.getScanner(indexScan);
-            scanner.next(row);
+            hasMore = scanner.next(row);
             if (row.isEmpty()) {
                 // This means the index row has been deleted before opening 
the new scanner.
                 return;
@@ -356,6 +402,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                 // so that it can be repaired
                 scanner.close();
                 scanner = region.getScanner(indexScan);
+                hasMore = true;
                 row.clear();
                 return;
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index bbc70fc..85a4bf3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -61,7 +61,9 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -934,30 +936,61 @@ public class IndexUtil {
         return false;
     }
 
+    private static boolean addEmptyColumnToFilter(Scan scan, byte[] emptyCF, 
byte[] emptyCQ, Filter filter,  boolean addedEmptyColumn) {
+        if (filter instanceof EncodedQualifiersColumnProjectionFilter) {
+            ((EncodedQualifiersColumnProjectionFilter) 
filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME);
+            if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+                scan.addColumn(emptyCF, emptyCQ);
+                return true;
+            }
+        }
+        else if (filter instanceof ColumnProjectionFilter) {
+            ((ColumnProjectionFilter) filter).addTrackedColumn(new 
ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ));
+            if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+                scan.addColumn(emptyCF, emptyCQ);
+                return true;
+            }
+        }
+        else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) {
+            ((MultiEncodedCQKeyValueComparisonFilter) 
filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME);
+        }
+        else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) {
+            scan.addColumn(emptyCF, emptyCQ);
+            return true;
+        }
+        return addedEmptyColumn;
+    }
+
+    private static boolean addEmptyColumnToFilterList(Scan scan, byte[] 
emptyCF, byte[] emptyCQ, FilterList filterList, boolean addedEmptyColumn) {
+        Iterator<Filter> filterIterator = filterList.getFilters().iterator();
+        while (filterIterator.hasNext()) {
+            Filter filter = filterIterator.next();
+            if (filter instanceof FilterList) {
+                if (addEmptyColumnToFilterList(scan, emptyCF, emptyCQ, 
(FilterList) filter, addedEmptyColumn)) {
+                    addedEmptyColumn =  true;
+                }
+            } else {
+                if (addEmptyColumnToFilter(scan, emptyCF, emptyCQ, filter, 
addedEmptyColumn)) {
+                    addedEmptyColumn =  true;
+                }
+            }
+        }
+        return addedEmptyColumn;
+    }
+
     public static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] 
emptyCQ) {
         boolean addedEmptyColumn = false;
-        Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan);
-        while (iterator.hasNext()) {
-            Filter filter = iterator.next();
-            if (filter instanceof EncodedQualifiersColumnProjectionFilter) {
-                ((EncodedQualifiersColumnProjectionFilter) 
filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME);
-                if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
-                    scan.addColumn(emptyCF, emptyCQ);
+        Filter filter = scan.getFilter();
+        if (filter != null) {
+            if (filter instanceof FilterList) {
+                if (addEmptyColumnToFilterList(scan, emptyCF, emptyCQ, 
(FilterList) filter, addedEmptyColumn)) {
+                    addedEmptyColumn = true;
                 }
-            }
-            else if (filter instanceof ColumnProjectionFilter) {
-                ((ColumnProjectionFilter) filter).addTrackedColumn(new 
ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ));
-                if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
-                    scan.addColumn(emptyCF, emptyCQ);
+            } else {
+                if (addEmptyColumnToFilter(scan, emptyCF, emptyCQ, filter, 
addedEmptyColumn)) {
+                    addedEmptyColumn = true;
                 }
             }
-            else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) 
{
-                ((MultiEncodedCQKeyValueComparisonFilter) 
filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME);
-            }
-            else if (!addedEmptyColumn && filter instanceof 
FirstKeyOnlyFilter) {
-                scan.addColumn(emptyCF, emptyCQ);
-                addedEmptyColumn = true;
-            }
         }
         if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
             scan.addColumn(emptyCF, emptyCQ);

Reply via email to