This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 544ef96 PHOENIX-5969 Read repair reduces the number of rows returned for LIMIT queries 544ef96 is described below commit 544ef964f3e5256d87025f93cbf3ad14457639ab Author: Kadir <kozde...@salesforce.com> AuthorDate: Sat Jun 20 07:31:38 2020 -0700 PHOENIX-5969 Read repair reduces the number of rows returned for LIMIT queries --- .../end2end/index/GlobalIndexCheckerIT.java | 55 +++++++++++ .../apache/phoenix/index/GlobalIndexChecker.java | 105 +++++++++++++++------ .../java/org/apache/phoenix/util/IndexUtil.java | 71 ++++++++++---- 3 files changed, 183 insertions(+), 48 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index 5c358c7..07166a0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -225,6 +225,61 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { } @Test + public void testLimitWithUnverifiedRows() throws Exception { + if (async) { + return; + } + String dataTableName = generateUniqueName(); + populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + try (Connection conn = DriverManager.getConnection(getUrl())) { + String indexTableName = generateUniqueName(); + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (val1) include (val2, val3)"); + conn.commit(); + // Read all index rows and rewrite them back directly. This will overwrite existing rows with newer + // timestamps and set the empty column to value "x". This will make them unverified + conn.createStatement().execute("UPSERT INTO " + indexTableName + " SELECT * FROM " + + indexTableName); + conn.commit(); + // Verified that read repair will not reduce the number of rows returned for LIMIT queries + String selectSql = "SELECT * from " + indexTableName + " LIMIT 1"; + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("ab", rs.getString(1)); + assertEquals("a", rs.getString(2)); + assertEquals("abc", rs.getString(3)); + assertEquals("abcd", rs.getString(4)); + assertFalse(rs.next()); + selectSql = "SELECT val3 from " + dataTableName + " WHERE val1 = 'bc' LIMIT 1"; + // Verify that we will read from the index table + assertExplainPlan(conn, selectSql, dataTableName, indexTableName); + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("bcde", rs.getString(1)); + assertFalse(rs.next()); + // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update phase) where the verify flag is set + // to true and/or index rows are deleted and check that this does not impact the correctness + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + // This is to cover the case where there is no data table row for an unverified index row + conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('c', 'aa','cde')"); + commitWithException(conn); + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + // Verified that read repair will not reduce the number of rows returned for LIMIT queries + selectSql = "SELECT * from " + indexTableName + " LIMIT 1"; + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("ab", rs.getString(1)); + assertEquals("a", rs.getString(2)); + assertEquals("abc", rs.getString(3)); + assertEquals("abcd", rs.getString(4)); + assertFalse(rs.next()); + // Add rows and check everything is still okay + verifyTableHealth(conn, dataTableName, indexTableName); + + } + } + + @Test public void testSimulateConcurrentUpdates() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())) { String dataTableName = generateUniqueName(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index e2b6f0a..2d7fd9f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -27,7 +27,6 @@ import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import static org.apache.phoenix.compat.hbase.CompatUtil.*; - import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -49,6 +48,9 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -134,7 +136,10 @@ public class GlobalIndexChecker extends BaseRegionObserver { private long minTimestamp; private long maxTimestamp; private GlobalIndexCheckerSource metricsSource; - + private long rowCount = 0; + private long pageSize = Long.MAX_VALUE; + private boolean restartScanDueToPageFilterRemoval = false; + private boolean hasMore; public GlobalIndexScanner(RegionCoprocessorEnvironment env, Scan scan, RegionScanner scanner, @@ -173,12 +178,14 @@ public class GlobalIndexChecker extends BaseRegionObserver { return scanner.getMaxResultSize(); } - @Override - public boolean next(List<Cell> result) throws IOException { + public boolean next(List<Cell> result, boolean raw) throws IOException { try { - boolean hasMore; do { - hasMore = scanner.next(result); + if (raw) { + hasMore = scanner.nextRaw(result); + } else { + hasMore = scanner.next(result); + } if (result.isEmpty()) { break; } @@ -188,6 +195,10 @@ public class GlobalIndexChecker extends BaseRegionObserver { // skip this row as it is invalid // if there is no more row, then result will be an empty list } while (hasMore); + rowCount++; + if (rowCount == pageSize) { + return false; + } return hasMore; } catch (Throwable t) { ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); @@ -196,6 +207,16 @@ public class GlobalIndexChecker extends BaseRegionObserver { } @Override + public boolean next(List<Cell> result) throws IOException { + return next(result, false); + } + + @Override + public boolean nextRaw(List<Cell> result) throws IOException { + return next(result, true); + } + + @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { throw new IOException("next with scannerContext should not be called in Phoenix environment"); } @@ -233,27 +254,6 @@ public class GlobalIndexChecker extends BaseRegionObserver { return scanner.getMvccReadPoint(); } - @Override - public boolean nextRaw(List<Cell> result) throws IOException { - try { - boolean hasMore; - do { - hasMore = scanner.nextRaw(result); - if (result.isEmpty()) { - break; - } - if (verifyRowAndRepairIfNecessary(result)) { - break; - } - // skip this row as it is invalid - // if there is no more row, then result will be an empty list - } while (hasMore); - return hasMore; - } catch (Throwable t) { - ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); - return false; // impossible - } - } private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts, boolean specific) throws IOException { if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) { @@ -270,9 +270,44 @@ public class GlobalIndexChecker extends BaseRegionObserver { } } + private PageFilter removePageFilterFromFilterList(FilterList filterList) { + Iterator<Filter> filterIterator = filterList.getFilters().iterator(); + while (filterIterator.hasNext()) { + Filter filter = filterIterator.next(); + if (filter instanceof PageFilter) { + filterIterator.remove(); + return (PageFilter) filter; + } else if (filter instanceof FilterList) { + PageFilter pageFilter = removePageFilterFromFilterList((FilterList) filter); + if (pageFilter != null) { + return pageFilter; + } + } + } + return null; + } + + // This method assumes that there is at most one instance of PageFilter in a scan + private PageFilter removePageFilter(Scan scan) { + Filter filter = scan.getFilter(); + if (filter != null) { + if (filter instanceof PageFilter) { + scan.setFilter(null); + return (PageFilter) filter; + } else if (filter instanceof FilterList) { + return removePageFilterFromFilterList((FilterList) filter); + } + } + return null; + } + private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws IOException { - // Build the data table row key from the index table row key if (buildIndexScan == null) { + PageFilter pageFilter = removePageFilter(scan); + if (pageFilter != null) { + pageSize = pageFilter.getPageSize(); + restartScanDueToPageFilterRemoval = true; + } buildIndexScan = new Scan(); indexScan = new Scan(scan); deleteRowScan = new Scan(); @@ -318,10 +353,20 @@ public class GlobalIndexChecker extends BaseRegionObserver { // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is // sufficient to do that row.clear(); + if (restartScanDueToPageFilterRemoval) { + scanner.close(); + setStartRow(indexScan, indexRowKey, false); + scanner = region.getScanner(indexScan); + hasMore = true; + // Set restartScanDueToPageFilterRemoval to false as we do not restart the scan unnecessarily next time + restartScanDueToPageFilterRemoval = false; + } return; } // An index row has been built. Close the current scanner as the newly built row will not be visible to it scanner.close(); + // Set restartScanDueToPageFilterRemoval to false as we do not restart the scan unnecessarily next time + restartScanDueToPageFilterRemoval = false; if (code == RebuildReturnCode.NO_INDEX_ROW.getValue()) { // This means there exists a data table row for the data row key derived from this unverified index row // but the data table row does not point back to the index row. @@ -330,6 +375,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { // Open a new scanner starting from the row after the current row setStartRow(indexScan, indexRowKey, false); scanner = region.getScanner(indexScan); + hasMore = true; // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is // sufficient to do that row.clear(); @@ -339,7 +385,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { // Open a new scanner starting from the current row setStartRow(indexScan, indexRowKey, true); scanner = region.getScanner(indexScan); - scanner.next(row); + hasMore = scanner.next(row); if (row.isEmpty()) { // This means the index row has been deleted before opening the new scanner. return; @@ -356,6 +402,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { // so that it can be repaired scanner.close(); scanner = region.getScanner(indexScan); + hasMore = true; row.clear(); return; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index bbc70fc..85a4bf3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -61,7 +61,9 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -934,30 +936,61 @@ public class IndexUtil { return false; } + private static boolean addEmptyColumnToFilter(Scan scan, byte[] emptyCF, byte[] emptyCQ, Filter filter, boolean addedEmptyColumn) { + if (filter instanceof EncodedQualifiersColumnProjectionFilter) { + ((EncodedQualifiersColumnProjectionFilter) filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME); + if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { + scan.addColumn(emptyCF, emptyCQ); + return true; + } + } + else if (filter instanceof ColumnProjectionFilter) { + ((ColumnProjectionFilter) filter).addTrackedColumn(new ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ)); + if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { + scan.addColumn(emptyCF, emptyCQ); + return true; + } + } + else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) { + ((MultiEncodedCQKeyValueComparisonFilter) filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME); + } + else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) { + scan.addColumn(emptyCF, emptyCQ); + return true; + } + return addedEmptyColumn; + } + + private static boolean addEmptyColumnToFilterList(Scan scan, byte[] emptyCF, byte[] emptyCQ, FilterList filterList, boolean addedEmptyColumn) { + Iterator<Filter> filterIterator = filterList.getFilters().iterator(); + while (filterIterator.hasNext()) { + Filter filter = filterIterator.next(); + if (filter instanceof FilterList) { + if (addEmptyColumnToFilterList(scan, emptyCF, emptyCQ, (FilterList) filter, addedEmptyColumn)) { + addedEmptyColumn = true; + } + } else { + if (addEmptyColumnToFilter(scan, emptyCF, emptyCQ, filter, addedEmptyColumn)) { + addedEmptyColumn = true; + } + } + } + return addedEmptyColumn; + } + public static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) { boolean addedEmptyColumn = false; - Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan); - while (iterator.hasNext()) { - Filter filter = iterator.next(); - if (filter instanceof EncodedQualifiersColumnProjectionFilter) { - ((EncodedQualifiersColumnProjectionFilter) filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME); - if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { - scan.addColumn(emptyCF, emptyCQ); + Filter filter = scan.getFilter(); + if (filter != null) { + if (filter instanceof FilterList) { + if (addEmptyColumnToFilterList(scan, emptyCF, emptyCQ, (FilterList) filter, addedEmptyColumn)) { + addedEmptyColumn = true; } - } - else if (filter instanceof ColumnProjectionFilter) { - ((ColumnProjectionFilter) filter).addTrackedColumn(new ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ)); - if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { - scan.addColumn(emptyCF, emptyCQ); + } else { + if (addEmptyColumnToFilter(scan, emptyCF, emptyCQ, filter, addedEmptyColumn)) { + addedEmptyColumn = true; } } - else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) { - ((MultiEncodedCQKeyValueComparisonFilter) filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME); - } - else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) { - scan.addColumn(emptyCF, emptyCQ); - addedEmptyColumn = true; - } } if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { scan.addColumn(emptyCF, emptyCQ);