Repository: phoenix Updated Branches: refs/heads/master fff2edb4d -> a55c03cc9
Phoenix-1263 Only cache guideposts on physical PTable Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a55c03cc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a55c03cc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a55c03cc Branch: refs/heads/master Commit: a55c03cc91aebdd9ce3d04fbd32d7500ad4f67ce Parents: fff2edb Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Authored: Wed Oct 1 12:30:08 2014 +0530 Committer: Ramkrishna <ramkrishna.s.vasude...@intel.com> Committed: Wed Oct 1 12:30:08 2014 +0530 ---------------------------------------------------------------------- .../end2end/BaseTenantSpecificTablesIT.java | 22 ++++- ...efaultParallelIteratorsRegionSplitterIT.java | 2 +- .../phoenix/end2end/GuidePostsLifeCycleIT.java | 4 +- .../org/apache/phoenix/end2end/KeyOnlyIT.java | 2 +- .../phoenix/end2end/MultiCfQueryExecIT.java | 2 +- ...ipRangeParallelIteratorRegionSplitterIT.java | 2 +- .../end2end/TenantSpecificTablesDMLIT.java | 96 +++++++++++++++----- .../coprocessor/BaseScannerRegionObserver.java | 2 +- .../coprocessor/MetaDataEndpointImpl.java | 4 +- .../UngroupedAggregateRegionObserver.java | 22 +++-- .../DefaultParallelIteratorRegionSplitter.java | 11 +-- ...ocalIndexParallelIteratorRegionSplitter.java | 8 +- .../ParallelIteratorRegionSplitterFactory.java | 6 +- .../phoenix/iterate/ParallelIterators.java | 25 ++++- ...SkipRangeParallelIteratorRegionSplitter.java | 18 ++-- .../apache/phoenix/schema/MetaDataClient.java | 25 +++-- 16 files changed, 179 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java index 85d65e2..bcae7ed 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java @@ -20,10 +20,16 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; import java.sql.SQLException; +import java.util.Map; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.experimental.categories.Category; +import com.google.common.collect.Maps; + /** * Describe your class here. * @@ -35,9 +41,9 @@ import org.junit.experimental.categories.Category; public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT { protected static final String TENANT_ID = "ZZTop"; protected static final String TENANT_TYPE_ID = "abc"; - protected static final String PHOENIX_JDBC_TENANT_SPECIFIC_URL = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID; + protected static String PHOENIX_JDBC_TENANT_SPECIFIC_URL; protected static final String TENANT_ID2 = "Styx"; - protected static final String PHOENIX_JDBC_TENANT_SPECIFIC_URL2 = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID2; + protected static String PHOENIX_JDBC_TENANT_SPECIFIC_URL2; protected static final String PARENT_TABLE_NAME = "PARENT_TABLE"; protected static final String PARENT_TABLE_DDL = "CREATE TABLE " + PARENT_TABLE_NAME + " ( \n" + @@ -64,6 +70,7 @@ public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT " tenant_col VARCHAR) AS SELECT *\n" + " FROM " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID; + @Before public void createTables() throws SQLException { createTestTable(getUrl(), PARENT_TABLE_DDL, null, nextTimestamp()); @@ -71,4 +78,15 @@ public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT createTestTable(PHOENIX_JDBC_TENANT_SPECIFIC_URL, TENANT_TABLE_DDL, null, nextTimestamp()); createTestTable(PHOENIX_JDBC_TENANT_SPECIFIC_URL, TENANT_TABLE_DDL_NO_TENANT_TYPE_ID, null, nextTimestamp()); } + + @BeforeClass + @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + PHOENIX_JDBC_TENANT_SPECIFIC_URL = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID; + PHOENIX_JDBC_TENANT_SPECIFIC_URL2 = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID2; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java index a6ec835..e7a1044 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java @@ -79,7 +79,7 @@ public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIterat final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes()); PhoenixStatement statement = new PhoenixStatement(pconn); StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); - DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef, HintNode.EMPTY_HINT_NODE) { + DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), HintNode.EMPTY_HINT_NODE) { @Override protected List<HRegionLocation> getAllRegions() throws SQLException { return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(), scan.getStopRow()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java index 3cef492..ba9f961 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java @@ -89,7 +89,7 @@ public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT { tableRef.getTable().getPhysicalName().getBytes()); PhoenixStatement statement = new PhoenixStatement(pconn); StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); - DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef, + DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), HintNode.EMPTY_HINT_NODE) { @Override protected List<HRegionLocation> getAllRegions() throws SQLException { @@ -158,7 +158,7 @@ public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT { conn.commit(); conn.close(); } - + protected static TableRef getTableRef(Connection conn) throws SQLException { PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable( http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java index f713fff..ed081d9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java @@ -200,7 +200,7 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT { tableRef.getTable().getPhysicalName().getBytes()); PhoenixStatement statement = new PhoenixStatement(pconn); StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); - DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef, + DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), HintNode.EMPTY_HINT_NODE) { @Override protected List<HRegionLocation> getAllRegions() throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java index f01d985..fbd1cf6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java @@ -298,7 +298,7 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { tableRef.getTable().getPhysicalName().getBytes()); PhoenixStatement statement = new PhoenixStatement(pconn); StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); - DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef, + DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), HintNode.EMPTY_HINT_NODE) { @Override protected List<HRegionLocation> getAllRegions() throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java index 31e3a3b..28bc011 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java @@ -348,7 +348,7 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged PhoenixStatement statement = new PhoenixStatement(connection); StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); context.setScanRanges(scanRanges); - SkipRangeParallelIteratorRegionSplitter splitter = SkipRangeParallelIteratorRegionSplitter.getInstance(context, tableRef, HintNode.EMPTY_HINT_NODE); + SkipRangeParallelIteratorRegionSplitter splitter = SkipRangeParallelIteratorRegionSplitter.getInstance(context, tableRef.getTable(), HintNode.EMPTY_HINT_NODE); List<KeyRange> keyRanges = splitter.getSplits(); Collections.sort(keyRanges, new Comparator<KeyRange>() { @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java index dba4264..e4c17f9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java @@ -28,9 +28,24 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.Properties; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.HintNode; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; @@ -38,7 +53,7 @@ import org.junit.experimental.categories.Category; @Category(ClientManagedTimeTest.class) public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { - + @Test public void testBasicUpsertSelect() throws Exception { Connection conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); @@ -48,9 +63,8 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " (id, tenant_col) values (2, 'Viva Las Vegas')"); conn.commit(); conn.close(); - analyzeTable(conn, TENANT_TABLE_NAME); - conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); + analyzeTable(conn, TENANT_TABLE_NAME); ResultSet rs = conn.createStatement().executeQuery("select tenant_col from " + TENANT_TABLE_NAME + " where id = 1"); assertTrue("Expected 1 row in result set", rs.next()); assertEquals("Cheap Sunglasses", rs.getString(1)); @@ -71,34 +85,35 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn1.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('me','" + TENANT_TYPE_ID + "',1,'Cheap Sunglasses')"); conn1.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('you','" + TENANT_TYPE_ID +"',2,'Viva Las Vegas')"); conn1.commit(); - + conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); analyzeTable(conn1, TENANT_TABLE_NAME); conn2.setAutoCommit(true); conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('them','" + TENANT_TYPE_ID + "',1,'Long Hair')"); conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('us','" + TENANT_TYPE_ID + "',2,'Black Hat')"); - analyzeTable(conn2, TENANT_TABLE_NAME); - conn2.close(); + conn2.close(); conn1.close(); - conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); ResultSet rs = conn1.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME + " where id = 1"); assertTrue("Expected 1 row in result set", rs.next()); assertEquals(1, rs.getInt(3)); assertEquals("Cheap Sunglasses", rs.getString(4)); assertFalse("Expected 1 row in result set", rs.next()); - conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2); + analyzeTable(conn2, TENANT_TABLE_NAME); + rs = conn2.createStatement().executeQuery("select count(*) from " + TENANT_TABLE_NAME ); rs = conn2.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME + " where id = 2"); assertTrue("Expected 1 row in result set", rs.next()); assertEquals(2, rs.getInt(3)); assertEquals("Black Hat", rs.getString(4)); assertFalse("Expected 1 row in result set", rs.next()); conn2.close(); + conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); + analyzeTable(conn1, TENANT_TABLE_NAME); + conn1.close(); conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2); conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " select * from " + TENANT_TABLE_NAME ); conn2.commit(); - analyzeTable(conn2, TENANT_TABLE_NAME); conn2.close(); conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2); @@ -115,10 +130,10 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2); conn2.setAutoCommit(true);; conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " select 'all', tenant_type_id, id, 'Big ' || tenant_col from " + TENANT_TABLE_NAME ); - analyzeTable(conn2, TENANT_TABLE_NAME); conn2.close(); conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2); + analyzeTable(conn2, TENANT_TABLE_NAME); rs = conn2.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME); assertTrue("Expected row in result set", rs.next()); assertEquals("all", rs.getString(1)); @@ -132,7 +147,17 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { assertEquals("Big Black Hat", rs.getString(4)); assertFalse("Expected 2 rows total", rs.next()); conn2.close(); - + conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); + rs = conn1.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME); + assertTrue("Expected row row in result set", rs.next()); + assertEquals(1, rs.getInt(3)); + assertEquals("Cheap Sunglasses", rs.getString(4)); + assertTrue("Expected 1 row in result set", rs.next()); + assertEquals(2, rs.getInt(3)); + assertEquals("Viva Las Vegas", rs.getString(4)); + conn1 = nextConnection(getUrl()); + List<KeyRange> splits = getSplits(conn1, new Scan()); + assertEquals(splits.size(), 5); } finally { conn1.close(); @@ -163,10 +188,10 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " (id, tenant_col) values (1, 'Cheap Sunglasses')"); conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " (id, tenant_col) values (2, 'Viva Las Vegas')"); conn.commit(); - analyzeTable(conn, TENANT_TABLE_NAME); conn.close(); conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); + analyzeTable(conn, TENANT_TABLE_NAME); ResultSet rs = conn.createStatement().executeQuery("select tenant_col from " + TENANT_TABLE_NAME + " join foo on k=id"); assertTrue("Expected 1 row in result set", rs.next()); assertEquals("Cheap Sunglasses", rs.getString(1)); @@ -190,7 +215,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'abc', 1, 'Bon Scott')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 1, 'Billy Gibbons')"); - analyzeTable(conn, PARENT_TABLE_NAME); conn.close(); conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); @@ -200,6 +224,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { assertFalse("Expected 1 row in result set", rs.next()); rs = conn.createStatement().executeQuery("select count(*) from " + TENANT_TABLE_NAME); + analyzeTable(conn, PARENT_TABLE_NAME); assertTrue("Expected 1 row in result set", rs.next()); assertEquals(1, rs.getInt(1)); assertFalse("Expected 1 row in result set", rs.next()); @@ -222,7 +247,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'abc', 1, 'Bon Scott')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 1, 'Billy Gibbons')"); - analyzeTable(conn, PARENT_TABLE_NAME); conn.close(); conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); @@ -238,6 +262,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.close(); conn = nextConnection(getUrl()); + analyzeTable(conn, PARENT_TABLE_NAME); rs = conn.createStatement().executeQuery("select count(*) from " + PARENT_TABLE_NAME); rs.next(); assertEquals(2, rs.getInt(1)); @@ -260,7 +285,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('AC/DC', 1, 'Bon Scott')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 1, 'Billy Gibbons')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 2, 'Billy Gibbons')"); - analyzeTable(conn, PARENT_TABLE_NAME_NO_TENANT_TYPE_ID); conn.close(); conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); @@ -268,7 +292,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { int count = conn.createStatement().executeUpdate("delete from " + TENANT_TABLE_NAME_NO_TENANT_TYPE_ID); assertEquals("Expected 2 rows have been deleted", 2, count); conn.close(); - conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); ResultSet rs = conn.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME_NO_TENANT_TYPE_ID); assertFalse("Expected no rows in result set", rs.next()); @@ -297,10 +320,10 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'abc', 1, 'Bon Scott')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 1, 'Billy Gibbons')"); - analyzeTable(conn, PARENT_TABLE_NAME); conn.close(); conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); + analyzeTable(conn, PARENT_TABLE_NAME); conn.createStatement().execute("delete from " + TENANT_TABLE_NAME); conn.commit(); conn.close(); @@ -328,7 +351,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('AC/DC', 1, 'Bon Scott')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 1, 'Billy Gibbons')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 2, 'Billy Gibbons')"); - analyzeTable(conn, PARENT_TABLE_NAME_NO_TENANT_TYPE_ID); conn.close(); conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); @@ -336,6 +358,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.close(); conn = nextConnection(getUrl()); + analyzeTable(conn, PARENT_TABLE_NAME_NO_TENANT_TYPE_ID); ResultSet rs = conn.createStatement().executeQuery("select count(*) from " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID); rs.next(); assertEquals(3, rs.getInt(1)); @@ -358,11 +381,11 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'aaa', 1, 'Bon Scott')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 2, 'Billy Gibbons')"); - analyzeTable(conn, PARENT_TABLE_NAME); conn.close(); conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); conn.setAutoCommit(true); + analyzeTable(conn, TENANT_TABLE_NAME); int count = conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + "(id, user) select id+100, user from " + TENANT_TABLE_NAME); assertEquals("Expected 1 row to have been inserted", 1, count); conn.close(); @@ -393,11 +416,11 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'aaa', 1, 'Bon Scott')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')"); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 2, 'Billy Gibbons')"); - analyzeTable(conn, PARENT_TABLE_NAME); conn.close(); conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); conn.setAutoCommit(true); + analyzeTable(conn, TENANT_TABLE_NAME); int count = conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + "(id, user) select id+100, user from ANOTHER_TENANT_TABLE where id=2"); assertEquals("Expected 1 row to have been inserted", 1, count); conn.close(); @@ -442,10 +465,9 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); conn.setAutoCommit(true); conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_type_id, id, user) values ('" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')"); - analyzeTable(conn, PARENT_TABLE_NAME); conn.close(); - conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); + analyzeTable(conn, PARENT_TABLE_NAME); rs = conn.createStatement().executeQuery("select user from " + PARENT_TABLE_NAME); assertTrue(rs.next()); assertEquals(rs.getString(1),"Billy Gibbons"); @@ -489,4 +511,34 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { assertFalse(rs.next()); conn.close(); } + private static List<KeyRange> getSplits(Connection conn, final Scan scan) throws SQLException { + TableRef tableRef = getTableRef(conn); + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions( + tableRef.getTable().getPhysicalName().getBytes()); + PhoenixStatement statement = new PhoenixStatement(pconn); + StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); + DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), + HintNode.EMPTY_HINT_NODE) { + @Override + protected List<HRegionLocation> getAllRegions() throws SQLException { + return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(), + scan.getStopRow()); + } + }; + List<KeyRange> keyRanges = splitter.getSplits(); + Collections.sort(keyRanges, new Comparator<KeyRange>() { + @Override + public int compare(KeyRange o1, KeyRange o2) { + return Bytes.compareTo(o1.getLowerRange(), o2.getLowerRange()); + } + }); + return keyRanges; + } + protected static TableRef getTableRef(Connection conn) throws SQLException { + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable( + new PTableKey(pconn.getTenantId(), PARENT_TABLE_NAME)), System.currentTimeMillis(), false); + return table; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 07b1495..d65beee 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -108,7 +108,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { abstract protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable; @Override - public final RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, + public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { if (isRegionObserverFor(scan)) { throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 1e3272b..4ca2f6d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -192,7 +192,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final KeyValue VIEW_INDEX_ID_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES); private static final KeyValue INDEX_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_TYPE_BYTES); private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES); + private static final KeyValue EMPTY_KEYVALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES); private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList( + EMPTY_KEYVALUE_KV, TABLE_TYPE_KV, TABLE_SEQ_NUM_KV, COLUMN_COUNT_KV, @@ -679,7 +681,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getTableName( schemaName.getString(), tableName.getString())) : physicalTables.get(0); - PTableStats stats = updateStatsInternal(physicalTableName.getBytes(), columns); + PTableStats stats = tenantId == null ? updateStatsInternal(physicalTableName.getBytes(), columns) : null; return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 508ae52..0bf2710 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -145,20 +145,28 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } @Override + public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s) + throws IOException { + s = super.preScannerOpen(e, scan, s); + if (ScanUtil.isAnalyzeTable(scan)) { + // We are setting the start row and stop row such that it covers the entire region. As part + // of Phonenix-1263 we are storing the guideposts against the physical table rather than + // individual tenant specific tables. + scan.setStartRow(HConstants.EMPTY_START_ROW); + scan.setStopRow(HConstants.EMPTY_END_ROW); + } + return s; + } + + @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { int offset = 0; boolean isAnalyze = false; HRegion region = c.getEnvironment().getRegion(); - TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); StatisticsCollector stats = null; - if(scan.getAttribute(BaseScannerRegionObserver.ANALYZE_TABLE) != null && statsTable != null) { + if(ScanUtil.isAnalyzeTable(scan) && statsTable != null) { stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration()); isAnalyze = true; - // We are setting the start row and stop row such that it covers the entire region. As part - // of Phonenix-1263 we are storing the guideposts against the physical table rather than - // individual tenant specific tables. - scan.setStartRow(HConstants.EMPTY_START_ROW); - scan.setStopRow(HConstants.EMPTY_END_ROW); } if (ScanUtil.isLocalIndex(scan)) { /* http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java index 8fb85ae..063c22c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java @@ -31,7 +31,6 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.ScanUtil; @@ -55,16 +54,16 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe protected final long guidePostsDepth; protected final StatementContext context; - protected final TableRef tableRef; + protected final PTable table; private static final Logger logger = LoggerFactory.getLogger(DefaultParallelIteratorRegionSplitter.class); - public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, TableRef table, HintNode hintNode) { + public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, PTable table, HintNode hintNode) { return new DefaultParallelIteratorRegionSplitter(context, table, hintNode); } - protected DefaultParallelIteratorRegionSplitter(StatementContext context, TableRef table, HintNode hintNode) { + protected DefaultParallelIteratorRegionSplitter(StatementContext context, PTable table, HintNode hintNode) { this.context = context; - this.tableRef = table; + this.table = table; ReadOnlyProps props = context.getConnection().getQueryServices().getProps(); this.guidePostsDepth = props.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH); @@ -73,7 +72,6 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe // Get the mapping between key range and the regions that contains them. protected List<HRegionLocation> getAllRegions() throws SQLException { Scan scan = context.getScan(); - PTable table = tableRef.getTable(); List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices() .getAllTableRegions(table.getPhysicalName().getBytes()); // If we're not salting, then we've already intersected the minMaxRange with the scan range @@ -110,7 +108,6 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) { if (regions.isEmpty()) { return Collections.emptyList(); } Scan scan = context.getScan(); - PTable table = this.tableRef.getTable(); byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table); List<byte[]> gps = Lists.newArrayList(); if (!ScanUtil.isAnalyzeTable(scan)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/main/java/org/apache/phoenix/iterate/LocalIndexParallelIteratorRegionSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LocalIndexParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LocalIndexParallelIteratorRegionSplitter.java index 14da71e..fdc4c5a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LocalIndexParallelIteratorRegionSplitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LocalIndexParallelIteratorRegionSplitter.java @@ -23,21 +23,21 @@ import java.util.List; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.parse.HintNode; -import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.PTable; public class LocalIndexParallelIteratorRegionSplitter extends DefaultParallelIteratorRegionSplitter { - public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, TableRef table, HintNode hintNode) { + public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, PTable table, HintNode hintNode) { return new LocalIndexParallelIteratorRegionSplitter(context, table, hintNode); } - protected LocalIndexParallelIteratorRegionSplitter(StatementContext context, TableRef table, HintNode hintNode) { + protected LocalIndexParallelIteratorRegionSplitter(StatementContext context, PTable table, HintNode hintNode) { super(context,table,hintNode); } @Override protected List<HRegionLocation> getAllRegions() throws SQLException { - return context.getConnection().getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes()); + return context.getConnection().getQueryServices().getAllTableRegions(table.getPhysicalName().getBytes()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java index d3c7d46..cc82725 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java @@ -21,7 +21,7 @@ import java.sql.SQLException; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.parse.HintNode; -import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; @@ -30,8 +30,8 @@ import org.apache.phoenix.schema.PTable.IndexType; */ public class ParallelIteratorRegionSplitterFactory { - public static ParallelIteratorRegionSplitter getSplitter(StatementContext context, TableRef table, HintNode hintNode) throws SQLException { - if(table.getTable().getIndexType() == IndexType.LOCAL) { + public static ParallelIteratorRegionSplitter getSplitter(StatementContext context, PTable table, HintNode hintNode) throws SQLException { + if(table.getIndexType() == IndexType.LOCAL) { return LocalIndexParallelIteratorRegionSplitter.getInstance(context, table, hintNode); } if (context.getScanRanges().useSkipScanFilter()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index a2dabe3..4da593f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.job.JobManager.JobCallable; @@ -55,10 +56,12 @@ import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; +import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; @@ -107,7 +110,25 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory) throws SQLException { super(context, tableRef, groupBy); - this.splits = getSplits(context, tableRef, statement.getHint()); + MetaDataClient client = new MetaDataClient(context.getConnection()); + PTable physicalTable = tableRef.getTable(); + String physicalName = tableRef.getTable().getPhysicalName().getString(); + if ((physicalTable.getViewIndexId() == null) && (!physicalName.equals(physicalTable.getName().getString()))) { // tableRef is not for the physical table + String physicalSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalName); + String physicalTableName = SchemaUtil.getTableNameFromFullName(physicalName); + // TODO: this will be an extra RPC to ensure we have the latest guideposts, but is almost always + // unnecessary. We should instead track when the last time an update cache was done for this + // for physical table and not do it again until some interval has passed (it's ok to use stale stats). + MetaDataMutationResult result = client.updateCache(null, /* use global tenant id to get physical table */ + physicalSchemaName, physicalTableName); + physicalTable = result.getTable(); + if(physicalTable == null) { + client = new MetaDataClient(context.getConnection()); + physicalTable = client.getConnection().getMetaDataCache() + .getTable(new PTableKey(null, physicalTableName)); + } + } + this.splits = getSplits(context, physicalTable, statement.getHint()); this.iteratorFactory = iteratorFactory; Scan scan = context.getScan(); PTable table = tableRef.getTable(); @@ -233,7 +254,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { * @return the key ranges that should be scanned in parallel */ // exposed for tests - public static List<KeyRange> getSplits(StatementContext context, TableRef table, HintNode hintNode) throws SQLException { + public static List<KeyRange> getSplits(StatementContext context, PTable table, HintNode hintNode) throws SQLException { return ParallelIteratorRegionSplitterFactory.getSplitter(context, table, hintNode).getSplits(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java index 8312fe7..81f5af6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java @@ -21,16 +21,16 @@ import java.sql.SQLException; import java.util.List; import org.apache.hadoop.hbase.HRegionLocation; - -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SaltingUtil; -import org.apache.phoenix.schema.TableRef; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; /** @@ -38,17 +38,17 @@ import org.apache.phoenix.schema.TableRef; */ public class SkipRangeParallelIteratorRegionSplitter extends DefaultParallelIteratorRegionSplitter { - public static SkipRangeParallelIteratorRegionSplitter getInstance(StatementContext context, TableRef table, HintNode hintNode) { + public static SkipRangeParallelIteratorRegionSplitter getInstance(StatementContext context, PTable table, HintNode hintNode) { return new SkipRangeParallelIteratorRegionSplitter(context, table, hintNode); } - protected SkipRangeParallelIteratorRegionSplitter(StatementContext context, TableRef table, HintNode hintNode) { + protected SkipRangeParallelIteratorRegionSplitter(StatementContext context, PTable table, HintNode hintNode) { super(context, table, hintNode); } @Override protected List<HRegionLocation> getAllRegions() throws SQLException { - List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes()); + List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(table.getPhysicalName().getBytes()); return filterRegions(allTableRegions, context.getScanRanges()); } @@ -66,7 +66,7 @@ public class SkipRangeParallelIteratorRegionSplitter extends DefaultParallelIter KeyRange minMaxRange = context.getMinMaxRange(); if (minMaxRange != null) { KeyRange range = KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey()); - if (tableRef.getTable().getBucketNum() != null) { + if (table.getBucketNum() != null) { // Add salt byte, as minMaxRange won't have it minMaxRange = SaltingUtil.addSaltByte(region.getRegionInfo().getStartKey(), minMaxRange); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a55c03cc/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 7f824e8..09f77ac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -292,11 +292,20 @@ public class MetaDataClient { return updateCache(schemaName, tableName, false); } - private MetaDataMutationResult updateCache(String schemaName, String tableName, boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] here + private MetaDataMutationResult updateCache(String schemaName, String tableName, boolean alwaysHitServer) throws SQLException { + return updateCache(connection.getTenantId(), schemaName, tableName, alwaysHitServer); + } + + public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName) throws SQLException { + return updateCache(tenantId, schemaName, tableName, false); + } + + private MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName, + boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] herez Long scn = connection.getSCN(); boolean systemTable = SYSTEM_CATALOG_SCHEMA.equals(schemaName); // System tables must always have a null tenantId - PName tenantId = systemTable ? null : connection.getTenantId(); + tenantId = systemTable ? null : tenantId; long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; PTable table = null; String fullTableName = SchemaUtil.getTableName(schemaName, tableName); @@ -481,10 +490,9 @@ public class MetaDataClient { Long scn = connection.getSCN(); // Always invalidate the cache long clientTS = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn; - connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(), - table.getTableName().getBytes(), clientTS); - // Clear the cache also. So that for cases like major compaction also we would be able to use the stats - updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true); + connection.getQueryServices().clearCacheForTable(tenantIdBytes, + Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(physicalName.getString())), + Bytes.toBytes(SchemaUtil.getTableNameFromFullName(physicalName.getString())), clientTS); String query = "SELECT CURRENT_DATE(),"+ LAST_STATS_UPDATE_TIME + " FROM " + SYSTEM_CATALOG_SCHEMA + "." + SYSTEM_STATS_TABLE + " WHERE " + PHYSICAL_NAME + "='" + physicalName.getString() + "' AND " + COLUMN_FAMILY + " IS NULL AND " + REGION_NAME + " IS NULL"; @@ -511,8 +519,9 @@ public class MetaDataClient { // A single Cell will be returned with the count(*) - we decode that here long rowCount = PDataType.LONG.getCodec().decodeLong(tempPtr, SortOrder.getDefault()); // We need to update the stats table - connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(), - table.getTableName().getBytes(), clientTS); + connection.getQueryServices().clearCacheForTable(tenantIdBytes, + Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(physicalName.getString())), + Bytes.toBytes(SchemaUtil.getTableNameFromFullName(physicalName.getString())), clientTS); return new MutationState(0, connection, rowCount); } else { return new MutationState(0, connection);