PHOENIX-2029 Queries are making two rpc calls for getTable
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/03a6ac00 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/03a6ac00 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/03a6ac00 Branch: refs/heads/json Commit: 03a6ac00286f9fbd0466b5739c4036ccb3ad6afb Parents: d1f7ded Author: Thomas D'Silva <twdsi...@gmail.com> Authored: Mon Jun 8 15:30:40 2015 -0700 Committer: Thomas D'Silva <tdsi...@salesforce.com> Committed: Wed Jun 17 11:21:43 2015 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/rpc/UpdateCacheIT.java | 139 +++++++++++++++++++ .../apache/phoenix/compile/QueryCompiler.java | 2 +- .../coprocessor/MetaDataEndpointImpl.java | 6 +- .../apache/phoenix/schema/MetaDataClient.java | 26 ++-- 4 files changed, 156 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/03a6ac00/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java new file mode 100644 index 0000000..c657e41 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java @@ -0,0 +1,139 @@ +package org.apache.phoenix.rpc; + +import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA; +import static org.apache.phoenix.util.TestUtil.MUTABLE_INDEX_DATA_TABLE; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.end2end.Shadower; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.util.DateUtil; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.Maps; + +/** + * Verifies the number of rpcs calls from {@link MetaDataClient} updateCache() + * for transactional and non-transactional tables. + */ +public class UpdateCacheIT extends BaseHBaseManagedTimeIT { + + public static final int NUM_MILLIS_IN_DAY = 86400000; + + @Before + public void setUp() throws SQLException { + ensureTableCreated(getUrl(), MUTABLE_INDEX_DATA_TABLE); + } + + @BeforeClass + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + public static void validateRowKeyColumns(ResultSet rs, int i) throws SQLException { + assertTrue(rs.next()); + assertEquals(rs.getString(1), "varchar" + String.valueOf(i)); + assertEquals(rs.getString(2), "char" + String.valueOf(i)); + assertEquals(rs.getInt(3), i); + assertEquals(rs.getInt(4), i); + assertEquals(rs.getBigDecimal(5), new BigDecimal(i*0.5d)); + Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * NUM_MILLIS_IN_DAY); + assertEquals(rs.getDate(6), date); + } + + public static void setRowKeyColumns(PreparedStatement stmt, int i) throws SQLException { + // insert row + stmt.setString(1, "varchar" + String.valueOf(i)); + stmt.setString(2, "char" + String.valueOf(i)); + stmt.setInt(3, i); + stmt.setLong(4, i); + stmt.setBigDecimal(5, new BigDecimal(i*0.5d)); + Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * NUM_MILLIS_IN_DAY); + stmt.setDate(6, date); + } + + @Test + public void testUpdateCache() throws Exception { + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + MUTABLE_INDEX_DATA_TABLE; + String selectSql = "SELECT * FROM "+fullTableName; + // use a spyed ConnectionQueryServices so we can verify calls to getTable + ConnectionQueryServices connectionQueryServices = Mockito.spy(driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))); + Properties props = new Properties(); + props.putAll(PhoenixEmbeddedDriver.DEFFAULT_PROPS.asMap()); + Connection conn = connectionQueryServices.connect(getUrl(), props); + try { + conn.setAutoCommit(false); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + reset(connectionQueryServices); + + String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + // upsert three rows + setRowKeyColumns(stmt, 1); + stmt.execute(); + setRowKeyColumns(stmt, 2); + stmt.execute(); + setRowKeyColumns(stmt, 3); + stmt.execute(); + conn.commit(); + // verify only one rpc to getTable occurs after commit is called + verify(connectionQueryServices, times(1)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(MUTABLE_INDEX_DATA_TABLE)), anyLong(), anyLong()); + reset(connectionQueryServices); + + rs = conn.createStatement().executeQuery(selectSql); + validateRowKeyColumns(rs, 1); + validateRowKeyColumns(rs, 2); + validateRowKeyColumns(rs, 3); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery(selectSql); + validateRowKeyColumns(rs, 1); + validateRowKeyColumns(rs, 2); + validateRowKeyColumns(rs, 3); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery(selectSql); + validateRowKeyColumns(rs, 1); + validateRowKeyColumns(rs, 2); + validateRowKeyColumns(rs, 3); + assertFalse(rs.next()); + conn.commit(); + // there should be one rpc to getTable per query + verify(connectionQueryServices, times(3)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(MUTABLE_INDEX_DATA_TABLE)), anyLong(), anyLong()); + } + finally { + conn.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/03a6ac00/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index e877e03..94ff075 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -529,7 +529,7 @@ public class QueryCompiler { // Don't pass groupBy when building where clause expression, because we do not want to wrap these // expressions as group by key expressions since they're pre, not post filtered. if (innerPlan == null && !tableRef.equals(resolver.getTables().get(0))) { - context.setResolver(FromCompiler.getResolverForQuery(select, this.statement.getConnection())); + context.setResolver(FromCompiler.getResolver(context.getConnection(), tableRef, select.getUdfParseNodes())); } Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> newHashSet(); Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries); http://git-wip-us.apache.org/repos/asf/phoenix/blob/03a6ac00/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 39a4956..1d578f5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -1794,10 +1794,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (columnToDelete.isViewReferenced()) { // Disallow deletion of column referenced in WHERE clause of view return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete); } - // Look for columnToDelete in any indexes. If found as PK - // column, get lock and drop the index. If found as covered - // column, delete from index (do this client side?). - // In either case, invalidate index if the column is in it + // Look for columnToDelete in any indexes. If found as PK column, get lock and drop the index and then invalidate it + // Covered columns are deleted from the index by the client PhoenixConnection connection = table.getIndexes().isEmpty() ? null : QueryUtil.getConnection(env.getConfiguration()).unwrap(PhoenixConnection.class); for (PTable index : table.getIndexes()) { try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/03a6ac00/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index fcdb651..75678fd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -436,9 +436,9 @@ public class MetaDataClient { // timestamp, we can handle this such that we don't ask the // server again. if (table != null) { - // Ensures that table in result is set to table found in our cache. - result.setTable(table); if (code == MutationCode.TABLE_ALREADY_EXISTS) { + // Ensures that table in result is set to table found in our cache. + result.setTable(table); // Although this table is up-to-date, the parent table may not be. // In this case, we update the parent table which may in turn pull // in indexes to add to this table. @@ -2692,18 +2692,20 @@ public class MetaDataClient { dropColumnMutations(table, tableColumnsToDrop, tableMetaData); for (PTable index : table.getIndexes()) { + IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection); + // get the columns required for the index pk + Set<ColumnReference> indexColumns = indexMaintainer.getIndexedColumns(); + // get the covered columns + Set<ColumnReference> coveredColumns = indexMaintainer.getCoverededColumns(); List<PColumn> indexColumnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size()); for(PColumn columnToDrop : tableColumnsToDrop) { - String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop); - try { - PColumn indexColumn = index.getColumn(indexColumnName); - if (SchemaUtil.isPKColumn(indexColumn)) { - indexesToDrop.add(new TableRef(index)); - } else { - indexColumnsToDrop.add(indexColumn); - columnsToDrop.add(new ColumnRef(tableRef, columnToDrop.getPosition())); - } - } catch (ColumnNotFoundException e) { + ColumnReference columnToDropRef = new ColumnReference(columnToDrop.getFamilyName().getBytes(), columnToDrop.getName().getBytes()); + if (indexColumns.contains(columnToDropRef)) { + indexesToDrop.add(new TableRef(index)); + } + else if (coveredColumns.contains(columnToDropRef)) { + String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop); + indexColumnsToDrop.add(index.getColumn(indexColumnName)); } } if(!indexColumnsToDrop.isEmpty()) {