PHOENIX-4332 Indexes should inherit guide post width of the base data table
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7bea88dd Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7bea88dd Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7bea88dd Branch: refs/heads/4.x-HBase-1.2 Commit: 7bea88dd81b95d162aa9b79f4c788967d523acda Parents: 32beb70 Author: Samarth Jain <sama...@apache.org> Authored: Wed Nov 1 23:24:52 2017 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Nov 15 10:02:14 2017 -0800 ---------------------------------------------------------------------- ...mnEncodedImmutableNonTxStatsCollectorIT.java | 1 + ...olumnEncodedImmutableTxStatsCollectorIT.java | 1 + ...lumnEncodedMutableNonTxStatsCollectorIT.java | 1 + .../ColumnEncodedMutableTxStatsCollectorIT.java | 1 + ...mnEncodedImmutableNonTxStatsCollectorIT.java | 1 + ...olumnEncodedImmutableTxStatsCollectorIT.java | 1 + .../phoenix/end2end/StatsCollectorIT.java | 734 ---------------- ...SysTableNamespaceMappedStatsCollectorIT.java | 1 + .../phoenix/schema/stats/StatsCollectorIT.java | 832 +++++++++++++++++++ .../stats/DefaultStatisticsCollector.java | 58 +- 10 files changed, 895 insertions(+), 736 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7bea88dd/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableNonTxStatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableNonTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableNonTxStatsCollectorIT.java index d5d8442..eb01e89 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableNonTxStatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableNonTxStatsCollectorIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import java.util.Arrays; import java.util.Collection; +import org.apache.phoenix.schema.stats.StatsCollectorIT; import org.junit.runners.Parameterized.Parameters; public class ColumnEncodedImmutableNonTxStatsCollectorIT extends StatsCollectorIT { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7bea88dd/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableTxStatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableTxStatsCollectorIT.java index 23b1654..4e90d70 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableTxStatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableTxStatsCollectorIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import java.util.Arrays; import java.util.Collection; +import org.apache.phoenix.schema.stats.StatsCollectorIT; import org.junit.runners.Parameterized.Parameters; public class ColumnEncodedImmutableTxStatsCollectorIT extends StatsCollectorIT { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7bea88dd/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableNonTxStatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableNonTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableNonTxStatsCollectorIT.java index 24869a2..2a560db 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableNonTxStatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableNonTxStatsCollectorIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import java.util.Arrays; import java.util.Collection; +import org.apache.phoenix.schema.stats.StatsCollectorIT; import org.junit.runners.Parameterized.Parameters; public class ColumnEncodedMutableNonTxStatsCollectorIT extends StatsCollectorIT { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7bea88dd/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableTxStatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableTxStatsCollectorIT.java index eea591d..01fa2b5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableTxStatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableTxStatsCollectorIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import java.util.Arrays; import java.util.Collection; +import org.apache.phoenix.schema.stats.StatsCollectorIT; import org.junit.runners.Parameterized.Parameters; public class ColumnEncodedMutableTxStatsCollectorIT extends StatsCollectorIT { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7bea88dd/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableNonTxStatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableNonTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableNonTxStatsCollectorIT.java index fe70030..27c6dc2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableNonTxStatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableNonTxStatsCollectorIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import java.util.Arrays; import java.util.Collection; +import org.apache.phoenix.schema.stats.StatsCollectorIT; import org.junit.runners.Parameterized.Parameters; public class NonColumnEncodedImmutableNonTxStatsCollectorIT extends StatsCollectorIT { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7bea88dd/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableTxStatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableTxStatsCollectorIT.java index 10a846a..0cec31a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableTxStatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableTxStatsCollectorIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import java.util.Arrays; import java.util.Collection; +import org.apache.phoenix.schema.stats.StatsCollectorIT; import org.junit.runners.Parameterized.Parameters; public class NonColumnEncodedImmutableTxStatsCollectorIT extends StatsCollectorIT { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7bea88dd/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java deleted file mode 100644 index da8e78d..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java +++ /dev/null @@ -1,734 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end; - -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.apache.phoenix.util.TestUtil.getAllSplits; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.sql.Array; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; - -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.query.ConnectionQueryServices; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTableImpl; -import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.stats.GuidePostsInfo; -import org.apache.phoenix.schema.stats.GuidePostsKey; -import org.apache.phoenix.schema.stats.StatisticsUtil; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.QueryUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.SchemaUtil; -import org.apache.phoenix.util.TestUtil; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import com.google.common.collect.Maps; - -@RunWith(Parameterized.class) -public abstract class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT { - private final String tableDDLOptions; - private final boolean columnEncoded; - private String tableName; - private String schemaName; - private String fullTableName; - private String physicalTableName; - private final boolean userTableNamespaceMapped; - private final boolean mutable; - - protected StatsCollectorIT(boolean mutable, boolean transactional, boolean userTableNamespaceMapped, boolean columnEncoded) { - StringBuilder sb = new StringBuilder(); - if (transactional) { - sb.append("TRANSACTIONAL=true"); - } - if (!columnEncoded) { - if (sb.length()>0) { - sb.append(","); - } - sb.append("COLUMN_ENCODED_BYTES=0"); - } else { - if (sb.length()>0) { - sb.append(","); - } - sb.append("COLUMN_ENCODED_BYTES=4"); - } - if (!mutable) { - if (sb.length()>0) { - sb.append(","); - } - sb.append("IMMUTABLE_ROWS=true"); - if (!columnEncoded) { - sb.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); - } - } - this.tableDDLOptions = sb.toString(); - this.userTableNamespaceMapped = userTableNamespaceMapped; - this.columnEncoded = columnEncoded; - this.mutable = mutable; - } - - @BeforeClass - public static void doSetup() throws Exception { - // enable name space mapping at global level on both client and server side - Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7); - serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true"); - serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); - Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); - clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true"); - clientProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); - } - - @Before - public void generateTableNames() throws SQLException { - schemaName = generateUniqueName(); - if (userTableNamespaceMapped) { - try (Connection conn = getConnection()) { - conn.createStatement().execute("CREATE SCHEMA " + schemaName); - } - } - tableName = "T_" + generateUniqueName(); - fullTableName = SchemaUtil.getTableName(schemaName, tableName); - physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, userTableNamespaceMapped).getString(); - } - - private Connection getConnection() throws SQLException { - return getConnection(Integer.MAX_VALUE); - } - - private Connection getConnection(Integer statsUpdateFreq) throws SQLException { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.setProperty(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString()); - props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString()); - props.setProperty(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Integer.toString(statsUpdateFreq)); - // enable/disable namespace mapping at connection level - props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(userTableNamespaceMapped)); - return DriverManager.getConnection(getUrl(), props); - } - - @Test - public void testUpdateEmptyStats() throws Exception { - Connection conn = getConnection(); - conn.setAutoCommit(true); - conn.createStatement().execute( - "CREATE TABLE " + fullTableName +" ( k CHAR(1) PRIMARY KEY )" + tableDDLOptions); - conn.createStatement().execute("UPDATE STATISTICS " + fullTableName); - ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName); - String explainPlan = QueryUtil.getExplainPlan(rs); - assertEquals( - "CLIENT 1-CHUNK 0 ROWS 20 BYTES PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName + "\n" + - " SERVER FILTER BY FIRST KEY ONLY", - explainPlan); - conn.close(); - } - - @Test - public void testSomeUpdateEmptyStats() throws Exception { - Connection conn = getConnection(); - conn.setAutoCommit(true); - conn.createStatement().execute( - "CREATE TABLE " + fullTableName +" ( k VARCHAR PRIMARY KEY, a.v1 VARCHAR, b.v2 VARCHAR ) " + tableDDLOptions + (tableDDLOptions.isEmpty() ? "" : ",") + "SALT_BUCKETS = 3"); - conn.createStatement().execute("UPSERT INTO " + fullTableName + "(k,v1) VALUES('a','123456789')"); - conn.createStatement().execute("UPDATE STATISTICS " + fullTableName); - - ResultSet rs; - String explainPlan; - rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM " + fullTableName + " WHERE v2='foo'"); - explainPlan = QueryUtil.getExplainPlan(rs); - // if we are using the ONE_CELL_PER_COLUMN_FAMILY storage scheme, we will have the single kv even though there are no values for col family v2 - String stats = columnEncoded && !mutable ? "4-CHUNK 1 ROWS 38 BYTES" : "3-CHUNK 0 ROWS 20 BYTES"; - assertEquals( - "CLIENT " + stats + " PARALLEL 3-WAY FULL SCAN OVER " + physicalTableName + "\n" + - " SERVER FILTER BY B.V2 = 'foo'\n" + - "CLIENT MERGE SORT", - explainPlan); - rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName); - explainPlan = QueryUtil.getExplainPlan(rs); - assertEquals( - "CLIENT 4-CHUNK 1 ROWS " + (columnEncoded ? "28" : "34") + " BYTES PARALLEL 3-WAY FULL SCAN OVER " + physicalTableName + "\n" + - "CLIENT MERGE SORT", - explainPlan); - rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName + " WHERE k = 'a'"); - explainPlan = QueryUtil.getExplainPlan(rs); - assertEquals( - "CLIENT 1-CHUNK 1 ROWS " + (columnEncoded ? "204" : "202") + " BYTES PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER " + physicalTableName + "\n" + - "CLIENT MERGE SORT", - explainPlan); - - conn.close(); - } - - @Test - public void testUpdateStats() throws SQLException, IOException, - InterruptedException { - Connection conn; - PreparedStatement stmt; - ResultSet rs; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - conn = getConnection(); - conn.createStatement().execute( - "CREATE TABLE " + fullTableName +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n" - + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))" - + tableDDLOptions ); - String[] s; - Array array; - conn = upsertValues(props, fullTableName); - // CAll the update statistics query here. If already major compaction has run this will not get executed. - stmt = conn.prepareStatement("UPDATE STATISTICS " + fullTableName); - stmt.execute(); - stmt = upsertStmt(conn, fullTableName); - stmt.setString(1, "z"); - s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(2, array); - s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(3, array); - stmt.execute(); - stmt = conn.prepareStatement("UPDATE STATISTICS " + fullTableName); - stmt.execute(); - rs = conn.createStatement().executeQuery("SELECT k FROM " + fullTableName); - assertTrue(rs.next()); - conn.close(); - } - - private void testNoDuplicatesAfterUpdateStats(String splitKey) throws Throwable { - Connection conn = getConnection(); - PreparedStatement stmt; - ResultSet rs; - conn.createStatement() - .execute("CREATE TABLE " + fullTableName - + " ( k VARCHAR, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k))"+ tableDDLOptions - + (splitKey != null ? " split on (" + splitKey + ")" : "") ); - conn.createStatement().execute("upsert into " + fullTableName + " values ('abc',1,3)"); - conn.createStatement().execute("upsert into " + fullTableName + " values ('def',2,4)"); - conn.commit(); - conn.createStatement().execute("UPDATE STATISTICS " + fullTableName); - rs = conn.createStatement().executeQuery("SELECT k FROM " + fullTableName + " order by k desc"); - assertTrue(rs.next()); - assertEquals("def", rs.getString(1)); - assertTrue(rs.next()); - assertEquals("abc", rs.getString(1)); - assertTrue(!rs.next()); - conn.close(); - } - - @Test - public void testNoDuplicatesAfterUpdateStatsWithSplits() throws Throwable { - testNoDuplicatesAfterUpdateStats("'abc','def'"); - } - - @Test - public void testNoDuplicatesAfterUpdateStatsWithDesc() throws Throwable { - testNoDuplicatesAfterUpdateStats(null); - } - - @Test - public void testUpdateStatsWithMultipleTables() throws Throwable { - String fullTableName2 = SchemaUtil.getTableName(schemaName, "T_" + generateUniqueName()); - Connection conn; - PreparedStatement stmt; - ResultSet rs; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - conn = getConnection(); - conn.createStatement().execute( - "CREATE TABLE " + fullTableName +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n" - + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))" + tableDDLOptions ); - conn.createStatement().execute( - "CREATE TABLE " + fullTableName2 +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n" - + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))" + tableDDLOptions ); - String[] s; - Array array; - conn = upsertValues(props, fullTableName); - conn = upsertValues(props, fullTableName2); - // CAll the update statistics query here - stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName); - stmt.execute(); - stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName2); - stmt.execute(); - stmt = upsertStmt(conn, fullTableName); - stmt.setString(1, "z"); - s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(2, array); - s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(3, array); - stmt.execute(); - stmt = upsertStmt(conn, fullTableName2); - stmt.setString(1, "z"); - s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(2, array); - s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(3, array); - stmt.execute(); - conn.close(); - conn = getConnection(); - // This analyze would not work - stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName2); - stmt.execute(); - rs = conn.createStatement().executeQuery("SELECT k FROM "+fullTableName2); - assertTrue(rs.next()); - conn.close(); - } - - private Connection upsertValues(Properties props, String tableName) throws SQLException, IOException, - InterruptedException { - Connection conn; - PreparedStatement stmt; - conn = getConnection(); - stmt = upsertStmt(conn, tableName); - stmt.setString(1, "a"); - String[] s = new String[] { "abc", "def", "ghi", "jkll", null, null, "xxx" }; - Array array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(2, array); - s = new String[] { "abc", "def", "ghi", "jkll", null, null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(3, array); - stmt.execute(); - conn.commit(); - stmt = upsertStmt(conn, tableName); - stmt.setString(1, "b"); - s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(2, array); - s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(3, array); - stmt.execute(); - conn.commit(); - stmt = upsertStmt(conn, tableName); - stmt.setString(1, "c"); - s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(2, array); - s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(3, array); - stmt.execute(); - conn.commit(); - stmt = upsertStmt(conn, tableName); - stmt.setString(1, "d"); - s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(2, array); - s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(3, array); - stmt.execute(); - conn.commit(); - stmt = upsertStmt(conn, tableName); - stmt.setString(1, "b"); - s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(2, array); - s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(3, array); - stmt.execute(); - conn.commit(); - stmt = upsertStmt(conn, tableName); - stmt.setString(1, "e"); - s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(2, array); - s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; - array = conn.createArrayOf("VARCHAR", s); - stmt.setArray(3, array); - stmt.execute(); - conn.commit(); - return conn; - } - - private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException { - PreparedStatement stmt; - stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); - return stmt; - } - - private void compactTable(Connection conn, String tableName) throws Exception { - TestUtil.doMajorCompaction(conn, tableName); - } - - @Test - @Ignore //TODO remove this once https://issues.apache.org/jira/browse/TEPHRA-208 is fixed - public void testCompactUpdatesStats() throws Exception { - testCompactUpdatesStats(0, fullTableName); - } - - @Test - @Ignore //TODO remove this once https://issues.apache.org/jira/browse/TEPHRA-208 is fixed - public void testCompactUpdatesStatsWithMinStatsUpdateFreq() throws Exception { - testCompactUpdatesStats(QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS, fullTableName); - } - - private static void invalidateStats(Connection conn, String tableName) throws SQLException { - PTable ptable = conn.unwrap(PhoenixConnection.class) - .getMetaDataCache().getTableRef(new PTableKey(null, tableName)) - .getTable(); - byte[] name = ptable.getPhysicalName().getBytes(); - conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(new GuidePostsKey(name, SchemaUtil.getEmptyColumnFamily(ptable))); - } - - private void testCompactUpdatesStats(Integer statsUpdateFreq, String tableName) throws Exception { - int nRows = 10; - Connection conn = getConnection(statsUpdateFreq); - PreparedStatement stmt; - conn.createStatement().execute("CREATE TABLE " + tableName + "(k CHAR(1) PRIMARY KEY, v INTEGER, w INTEGER) " - + (!tableDDLOptions.isEmpty() ? tableDDLOptions + "," : "") - + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE); - stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); - for (int i = 0; i < nRows; i++) { - stmt.setString(1, Character.toString((char) ('a' + i))); - stmt.setInt(2, i); - stmt.setInt(3, i); - stmt.executeUpdate(); - } - conn.commit(); - - compactTable(conn, physicalTableName); - - if (statsUpdateFreq != 0) { - invalidateStats(conn, tableName); - } else { - // Confirm that when we have a non zero STATS_UPDATE_FREQ_MS_ATTRIB, after we run - // UPDATATE STATISTICS, the new statistics are faulted in as expected. - List<KeyRange>keyRanges = getAllSplits(conn, tableName); - assertNotEquals(nRows+1, keyRanges.size()); - // If we've set MIN_STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE STATISTICS will invalidate the cache - // and forcing the new stats to be pulled over. - int rowCount = conn.createStatement().executeUpdate("UPDATE STATISTICS " + tableName); - assertEquals(10, rowCount); - } - List<KeyRange>keyRanges = getAllSplits(conn, tableName); - assertEquals(nRows+1, keyRanges.size()); - - int nDeletedRows = conn.createStatement().executeUpdate("DELETE FROM " + tableName + " WHERE V < " + nRows / 2); - conn.commit(); - assertEquals(5, nDeletedRows); - - Scan scan = new Scan(); - scan.setRaw(true); - PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); - try (HTableInterface htable = phxConn.getQueryServices().getTable(Bytes.toBytes(tableName))) { - ResultScanner scanner = htable.getScanner(scan); - Result result; - while ((result = scanner.next())!=null) { - System.out.println(result); - } - } - - compactTable(conn, physicalTableName); - - scan = new Scan(); - scan.setRaw(true); - phxConn = conn.unwrap(PhoenixConnection.class); - try (HTableInterface htable = phxConn.getQueryServices().getTable(Bytes.toBytes(tableName))) { - ResultScanner scanner = htable.getScanner(scan); - Result result; - while ((result = scanner.next())!=null) { - System.out.println(result); - } - } - - if (statsUpdateFreq != 0) { - invalidateStats(conn, tableName); - } else { - assertEquals(nRows+1, keyRanges.size()); - // If we've set STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE STATISTICS will invalidate the cache - // and force us to pull over the new stats - int rowCount = conn.createStatement().executeUpdate("UPDATE STATISTICS " + tableName); - assertEquals(5, rowCount); - } - keyRanges = getAllSplits(conn, tableName); - assertEquals(nRows/2+1, keyRanges.size()); - ResultSet rs = conn.createStatement().executeQuery("SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM " - + "\""+ SYSTEM_CATALOG_SCHEMA + "\".\"" + SYSTEM_STATS_TABLE + "\"" + " WHERE PHYSICAL_NAME='" + physicalTableName + "'"); - rs.next(); - assertEquals(nRows - nDeletedRows, rs.getLong(1)); - } - - @Test - public void testWithMultiCF() throws Exception { - int nRows = 20; - Connection conn = getConnection(0); - PreparedStatement stmt; - conn.createStatement().execute( - "CREATE TABLE " + fullTableName - + "(k VARCHAR PRIMARY KEY, a.v INTEGER, b.v INTEGER, c.v INTEGER NULL, d.v INTEGER NULL) " - + tableDDLOptions ); - stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?, ?, ?, ?)"); - byte[] val = new byte[250]; - for (int i = 0; i < nRows; i++) { - stmt.setString(1, Character.toString((char)('a' + i)) + Bytes.toString(val)); - stmt.setInt(2, i); - stmt.setInt(3, i); - stmt.setInt(4, i); - stmt.setInt(5, i); - stmt.executeUpdate(); - } - conn.commit(); - stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k, c.v, d.v) VALUES(?,?,?)"); - for (int i = 0; i < 5; i++) { - stmt.setString(1, Character.toString((char)('a' + 'z' + i)) + Bytes.toString(val)); - stmt.setInt(2, i); - stmt.setInt(3, i); - stmt.executeUpdate(); - } - conn.commit(); - - ResultSet rs; - TestUtil.analyzeTable(conn, fullTableName); - List<KeyRange> keyRanges = getAllSplits(conn, fullTableName); - assertEquals(26, keyRanges.size()); - rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName); - assertEquals("CLIENT 26-CHUNK 25 ROWS " + (columnEncoded ? ( mutable ? "12530" : "13902" ) : "12420") + " BYTES PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName, - QueryUtil.getExplainPlan(rs)); - - ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - List<HRegionLocation> regions = services.getAllTableRegions(Bytes.toBytes(physicalTableName)); - assertEquals(1, regions.size()); - - TestUtil.analyzeTable(conn, fullTableName); - String query = "UPDATE STATISTICS " + fullTableName + " SET \"" - + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + "\"=" + Long.toString(1000); - conn.createStatement().execute(query); - keyRanges = getAllSplits(conn, fullTableName); - boolean oneCellPerColFamliyStorageScheme = !mutable && columnEncoded; - assertEquals(oneCellPerColFamliyStorageScheme ? 13 : 12, keyRanges.size()); - - rs = conn - .createStatement() - .executeQuery( - "SELECT COLUMN_FAMILY,SUM(GUIDE_POSTS_ROW_COUNT),SUM(GUIDE_POSTS_WIDTH),COUNT(*) from \"SYSTEM\".STATS where PHYSICAL_NAME = '" - + physicalTableName + "' GROUP BY COLUMN_FAMILY ORDER BY COLUMN_FAMILY"); - - assertTrue(rs.next()); - assertEquals("A", rs.getString(1)); - assertEquals(24, rs.getInt(2)); - assertEquals(columnEncoded ? ( mutable ? 12252 : 13624 ) : 12144, rs.getInt(3)); - assertEquals(oneCellPerColFamliyStorageScheme ? 12 : 11, rs.getInt(4)); - - assertTrue(rs.next()); - assertEquals("B", rs.getString(1)); - assertEquals(oneCellPerColFamliyStorageScheme ? 24 : 20, rs.getInt(2)); - assertEquals(columnEncoded ? ( mutable ? 5600 : 6972 ) : 5540, rs.getInt(3)); - assertEquals(oneCellPerColFamliyStorageScheme ? 6 : 5, rs.getInt(4)); - - assertTrue(rs.next()); - assertEquals("C", rs.getString(1)); - assertEquals(24, rs.getInt(2)); - assertEquals(columnEncoded ? ( mutable ? 6724 : 6988 ) : 6652, rs.getInt(3)); - assertEquals(6, rs.getInt(4)); - - assertTrue(rs.next()); - assertEquals("D", rs.getString(1)); - assertEquals(24, rs.getInt(2)); - assertEquals(columnEncoded ? ( mutable ? 6724 : 6988 ) : 6652, rs.getInt(3)); - assertEquals(6, rs.getInt(4)); - - assertFalse(rs.next()); - - // Disable stats - conn.createStatement().execute("ALTER TABLE " + fullTableName + - " SET " + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=0"); - TestUtil.analyzeTable(conn, fullTableName); - // Assert that there are no more guideposts - rs = conn.createStatement().executeQuery("SELECT count(1) FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + - " WHERE " + PhoenixDatabaseMetaData.PHYSICAL_NAME + "='" + physicalTableName + "' AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NOT NULL"); - assertTrue(rs.next()); - assertEquals(0, rs.getLong(1)); - assertFalse(rs.next()); - rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName); - assertEquals("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName, - QueryUtil.getExplainPlan(rs)); - } - - @Test - public void testRowCountAndByteCounts() throws SQLException { - Connection conn = getConnection(); - String ddl = "CREATE TABLE " + fullTableName + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n" - + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n" - + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + tableDDLOptions + " split on ('e','j','o')"; - conn.createStatement().execute(ddl); - String[] strings = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", - "s", "t", "u", "v", "w", "x", "y", "z" }; - for (int i = 0; i < 26; i++) { - conn.createStatement().execute( - "UPSERT INTO " + fullTableName + " values('" + strings[i] + "'," + i + "," + (i + 1) + "," - + (i + 2) + ",'" + strings[25 - i] + "')"); - } - conn.commit(); - ResultSet rs; - String query = "UPDATE STATISTICS " + fullTableName + " SET \"" - + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + "\"=" + Long.toString(20); - conn.createStatement().execute(query); - Random r = new Random(); - int count = 0; - while (count < 4) { - int startIndex = r.nextInt(strings.length); - int endIndex = r.nextInt(strings.length - startIndex) + startIndex; - long rows = endIndex - startIndex; - long c2Bytes = rows * (columnEncoded ? ( mutable ? 37 : 48 ) : 35); - String physicalTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(fullTableName), userTableNamespaceMapped).toString(); - rs = conn.createStatement().executeQuery( - "SELECT COLUMN_FAMILY,SUM(GUIDE_POSTS_ROW_COUNT),SUM(GUIDE_POSTS_WIDTH) from \"SYSTEM\".STATS where PHYSICAL_NAME = '" - + physicalTableName + "' AND GUIDE_POST_KEY>= cast('" + strings[startIndex] - + "' as varbinary) AND GUIDE_POST_KEY<cast('" + strings[endIndex] - + "' as varbinary) and COLUMN_FAMILY='C2' group by COLUMN_FAMILY"); - if (startIndex < endIndex) { - assertTrue(rs.next()); - assertEquals("C2", rs.getString(1)); - assertEquals(rows, rs.getLong(2)); - assertEquals(c2Bytes, rs.getLong(3)); - count++; - } - } - } - - @Test - public void testRowCountWhenNumKVsExceedCompactionScannerThreshold() throws Exception { - String tableName = generateUniqueName(); - StringBuilder sb = new StringBuilder(200); - sb.append("CREATE TABLE " + tableName + "(PK1 VARCHAR NOT NULL, "); - int numRows = 10; - try (Connection conn = DriverManager.getConnection(getUrl())) { - int compactionScannerKVThreshold = - conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration() - .getInt(HConstants.COMPACTION_KV_MAX, - HConstants.COMPACTION_KV_MAX_DEFAULT); - int numKvColumns = compactionScannerKVThreshold * 2; - for (int i = 1; i <= numKvColumns; i++) { - sb.append("KV" + i + " VARCHAR"); - if (i < numKvColumns) { - sb.append(", "); - } - } - sb.append(" CONSTRAINT PK PRIMARY KEY (PK1))"); - String ddl = sb.toString(); - conn.createStatement().execute(ddl); - sb = new StringBuilder(200); - sb.append("UPSERT INTO " + tableName + " VALUES ("); - for (int i = 1; i <= numKvColumns + 1; i++) { - sb.append("?"); - if (i < numKvColumns + 1) { - sb.append(", "); - } - } - sb.append(")"); - String dml = sb.toString(); - PreparedStatement stmt = conn.prepareStatement(dml); - String keyValue = "KVVVVVV"; - for (int j = 1; j <= numRows; j++) { - for (int i = 1; i <= numKvColumns + 1; i++) { - if (i == 1) { - stmt.setString(1, "" + j); - } else { - stmt.setString(i, keyValue); - } - } - stmt.executeUpdate(); - } - conn.commit(); - conn.createStatement().execute("UPDATE STATISTICS " + tableName); - String q = "SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'"; - ResultSet rs = conn.createStatement().executeQuery(q); - rs.next(); - assertEquals("Number of expected rows in stats table after update stats didn't match!", numRows, rs.getInt(1)); - conn.createStatement().executeUpdate("DELETE FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'"); - conn.commit(); - TestUtil.doMajorCompaction(conn, tableName); - q = "SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'"; - rs = conn.createStatement().executeQuery(q); - rs.next(); - assertEquals("Number of expected rows in stats table after major compaction didn't match", numRows, rs.getInt(1)); - } - } - - @Test - public void testEmptyGuidePostGeneratedWhenDataSizeLessThanGPWidth() throws Exception { - String tableName = generateUniqueName(); - try (Connection conn = DriverManager.getConnection(getUrl())) { - long guidePostWidth = 20000000; - conn.createStatement() - .execute("CREATE TABLE " + tableName - + " ( k INTEGER, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k)) GUIDE_POSTS_WIDTH=" - + guidePostWidth + ", SALT_BUCKETS = 4"); - conn.createStatement().execute("upsert into " + tableName + " values (100,1,3)"); - conn.createStatement().execute("upsert into " + tableName + " values (101,2,4)"); - conn.commit(); - conn.createStatement().execute("UPDATE STATISTICS " + tableName); - ConnectionQueryServices queryServices = - conn.unwrap(PhoenixConnection.class).getQueryServices(); - try (HTableInterface statsHTable = - queryServices.getTable( - SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, - queryServices.getProps()).getName())) { - GuidePostsInfo gps = - StatisticsUtil.readStatistics(statsHTable, - new GuidePostsKey(Bytes.toBytes(tableName), Bytes.toBytes("C1")), - HConstants.LATEST_TIMESTAMP); - assertTrue(gps.isEmptyGuidePost()); - assertEquals(guidePostWidth, gps.getByteCounts()[0]); - assertTrue(gps.getGuidePostTimestamps()[0] > 0); - gps = - StatisticsUtil.readStatistics(statsHTable, - new GuidePostsKey(Bytes.toBytes(tableName), Bytes.toBytes("C2")), - HConstants.LATEST_TIMESTAMP); - assertTrue(gps.isEmptyGuidePost()); - assertEquals(guidePostWidth, gps.getByteCounts()[0]); - assertTrue(gps.getGuidePostTimestamps()[0] > 0); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7bea88dd/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java index ea5f32f..4830189 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Map; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.stats.StatsCollectorIT; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.BeforeClass; import org.junit.runners.Parameterized.Parameters; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7bea88dd/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java new file mode 100644 index 0000000..c424f45 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java @@ -0,0 +1,832 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stats; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.apache.phoenix.util.TestUtil.getAllSplits; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.sql.Array; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.MetaDataRegionObserver; +import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import com.google.common.collect.Maps; + +@RunWith(Parameterized.class) +public abstract class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT { + private final String tableDDLOptions; + private final boolean columnEncoded; + private String tableName; + private String schemaName; + private String fullTableName; + private String physicalTableName; + private final boolean userTableNamespaceMapped; + private final boolean mutable; + private static final int defaultGuidePostWidth = 20; + + protected StatsCollectorIT(boolean mutable, boolean transactional, boolean userTableNamespaceMapped, boolean columnEncoded) { + StringBuilder sb = new StringBuilder(); + if (transactional) { + sb.append("TRANSACTIONAL=true"); + } + if (!columnEncoded) { + if (sb.length()>0) { + sb.append(","); + } + sb.append("COLUMN_ENCODED_BYTES=0"); + } else { + if (sb.length()>0) { + sb.append(","); + } + sb.append("COLUMN_ENCODED_BYTES=4"); + } + if (!mutable) { + if (sb.length()>0) { + sb.append(","); + } + sb.append("IMMUTABLE_ROWS=true"); + if (!columnEncoded) { + sb.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + } + } + this.tableDDLOptions = sb.toString(); + this.userTableNamespaceMapped = userTableNamespaceMapped; + this.columnEncoded = columnEncoded; + this.mutable = mutable; + } + + @BeforeClass + public static void doSetup() throws Exception { + // enable name space mapping at global level on both client and server side + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7); + serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true"); + serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(defaultGuidePostWidth)); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); + clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true"); + clientProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(defaultGuidePostWidth)); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + @Before + public void generateTableNames() throws SQLException { + schemaName = generateUniqueName(); + if (userTableNamespaceMapped) { + try (Connection conn = getConnection()) { + conn.createStatement().execute("CREATE SCHEMA " + schemaName); + } + } + tableName = "T_" + generateUniqueName(); + fullTableName = SchemaUtil.getTableName(schemaName, tableName); + physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, userTableNamespaceMapped).getString(); + } + + private Connection getConnection() throws SQLException { + return getConnection(Integer.MAX_VALUE); + } + + private Connection getConnection(Integer statsUpdateFreq) throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString()); + props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString()); + props.setProperty(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Integer.toString(statsUpdateFreq)); + // enable/disable namespace mapping at connection level + props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(userTableNamespaceMapped)); + return DriverManager.getConnection(getUrl(), props); + } + + @Test + public void testUpdateEmptyStats() throws Exception { + Connection conn = getConnection(); + conn.setAutoCommit(true); + conn.createStatement().execute( + "CREATE TABLE " + fullTableName +" ( k CHAR(1) PRIMARY KEY )" + tableDDLOptions); + conn.createStatement().execute("UPDATE STATISTICS " + fullTableName); + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName); + String explainPlan = QueryUtil.getExplainPlan(rs); + assertEquals( + "CLIENT 1-CHUNK 0 ROWS 20 BYTES PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName + "\n" + + " SERVER FILTER BY FIRST KEY ONLY", + explainPlan); + conn.close(); + } + + @Test + public void testSomeUpdateEmptyStats() throws Exception { + Connection conn = getConnection(); + conn.setAutoCommit(true); + conn.createStatement().execute( + "CREATE TABLE " + fullTableName +" ( k VARCHAR PRIMARY KEY, a.v1 VARCHAR, b.v2 VARCHAR ) " + tableDDLOptions + (tableDDLOptions.isEmpty() ? "" : ",") + "SALT_BUCKETS = 3"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + "(k,v1) VALUES('a','123456789')"); + conn.createStatement().execute("UPDATE STATISTICS " + fullTableName); + + ResultSet rs; + String explainPlan; + rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM " + fullTableName + " WHERE v2='foo'"); + explainPlan = QueryUtil.getExplainPlan(rs); + // if we are using the ONE_CELL_PER_COLUMN_FAMILY storage scheme, we will have the single kv even though there are no values for col family v2 + String stats = columnEncoded && !mutable ? "4-CHUNK 1 ROWS 38 BYTES" : "3-CHUNK 0 ROWS 20 BYTES"; + assertEquals( + "CLIENT " + stats + " PARALLEL 3-WAY FULL SCAN OVER " + physicalTableName + "\n" + + " SERVER FILTER BY B.V2 = 'foo'\n" + + "CLIENT MERGE SORT", + explainPlan); + rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName); + explainPlan = QueryUtil.getExplainPlan(rs); + assertEquals( + "CLIENT 4-CHUNK 1 ROWS " + (columnEncoded ? "28" : "34") + " BYTES PARALLEL 3-WAY FULL SCAN OVER " + physicalTableName + "\n" + + "CLIENT MERGE SORT", + explainPlan); + rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName + " WHERE k = 'a'"); + explainPlan = QueryUtil.getExplainPlan(rs); + assertEquals( + "CLIENT 1-CHUNK 1 ROWS " + (columnEncoded ? "204" : "202") + " BYTES PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER " + physicalTableName + "\n" + + "CLIENT MERGE SORT", + explainPlan); + + conn.close(); + } + + @Test + public void testUpdateStats() throws SQLException, IOException, + InterruptedException { + Connection conn; + PreparedStatement stmt; + ResultSet rs; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = getConnection(); + conn.createStatement().execute( + "CREATE TABLE " + fullTableName +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n" + + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))" + + tableDDLOptions ); + String[] s; + Array array; + conn = upsertValues(props, fullTableName); + // CAll the update statistics query here. If already major compaction has run this will not get executed. + stmt = conn.prepareStatement("UPDATE STATISTICS " + fullTableName); + stmt.execute(); + stmt = upsertStmt(conn, fullTableName); + stmt.setString(1, "z"); + s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(2, array); + s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(3, array); + stmt.execute(); + stmt = conn.prepareStatement("UPDATE STATISTICS " + fullTableName); + stmt.execute(); + rs = conn.createStatement().executeQuery("SELECT k FROM " + fullTableName); + assertTrue(rs.next()); + conn.close(); + } + + private void testNoDuplicatesAfterUpdateStats(String splitKey) throws Throwable { + Connection conn = getConnection(); + PreparedStatement stmt; + ResultSet rs; + conn.createStatement() + .execute("CREATE TABLE " + fullTableName + + " ( k VARCHAR, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k))"+ tableDDLOptions + + (splitKey != null ? " split on (" + splitKey + ")" : "") ); + conn.createStatement().execute("upsert into " + fullTableName + " values ('abc',1,3)"); + conn.createStatement().execute("upsert into " + fullTableName + " values ('def',2,4)"); + conn.commit(); + conn.createStatement().execute("UPDATE STATISTICS " + fullTableName); + rs = conn.createStatement().executeQuery("SELECT k FROM " + fullTableName + " order by k desc"); + assertTrue(rs.next()); + assertEquals("def", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("abc", rs.getString(1)); + assertTrue(!rs.next()); + conn.close(); + } + + @Test + public void testNoDuplicatesAfterUpdateStatsWithSplits() throws Throwable { + testNoDuplicatesAfterUpdateStats("'abc','def'"); + } + + @Test + public void testNoDuplicatesAfterUpdateStatsWithDesc() throws Throwable { + testNoDuplicatesAfterUpdateStats(null); + } + + @Test + public void testUpdateStatsWithMultipleTables() throws Throwable { + String fullTableName2 = SchemaUtil.getTableName(schemaName, "T_" + generateUniqueName()); + Connection conn; + PreparedStatement stmt; + ResultSet rs; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = getConnection(); + conn.createStatement().execute( + "CREATE TABLE " + fullTableName +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n" + + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))" + tableDDLOptions ); + conn.createStatement().execute( + "CREATE TABLE " + fullTableName2 +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n" + + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))" + tableDDLOptions ); + String[] s; + Array array; + conn = upsertValues(props, fullTableName); + conn = upsertValues(props, fullTableName2); + // CAll the update statistics query here + stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName); + stmt.execute(); + stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName2); + stmt.execute(); + stmt = upsertStmt(conn, fullTableName); + stmt.setString(1, "z"); + s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(2, array); + s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(3, array); + stmt.execute(); + stmt = upsertStmt(conn, fullTableName2); + stmt.setString(1, "z"); + s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(2, array); + s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(3, array); + stmt.execute(); + conn.close(); + conn = getConnection(); + // This analyze would not work + stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName2); + stmt.execute(); + rs = conn.createStatement().executeQuery("SELECT k FROM "+fullTableName2); + assertTrue(rs.next()); + conn.close(); + } + + private Connection upsertValues(Properties props, String tableName) throws SQLException, IOException, + InterruptedException { + Connection conn; + PreparedStatement stmt; + conn = getConnection(); + stmt = upsertStmt(conn, tableName); + stmt.setString(1, "a"); + String[] s = new String[] { "abc", "def", "ghi", "jkll", null, null, "xxx" }; + Array array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(2, array); + s = new String[] { "abc", "def", "ghi", "jkll", null, null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(3, array); + stmt.execute(); + conn.commit(); + stmt = upsertStmt(conn, tableName); + stmt.setString(1, "b"); + s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(2, array); + s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(3, array); + stmt.execute(); + conn.commit(); + stmt = upsertStmt(conn, tableName); + stmt.setString(1, "c"); + s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(2, array); + s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(3, array); + stmt.execute(); + conn.commit(); + stmt = upsertStmt(conn, tableName); + stmt.setString(1, "d"); + s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(2, array); + s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(3, array); + stmt.execute(); + conn.commit(); + stmt = upsertStmt(conn, tableName); + stmt.setString(1, "b"); + s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(2, array); + s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(3, array); + stmt.execute(); + conn.commit(); + stmt = upsertStmt(conn, tableName); + stmt.setString(1, "e"); + s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(2, array); + s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" }; + array = conn.createArrayOf("VARCHAR", s); + stmt.setArray(3, array); + stmt.execute(); + conn.commit(); + return conn; + } + + private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException { + PreparedStatement stmt; + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); + return stmt; + } + + private void compactTable(Connection conn, String tableName) throws Exception { + TestUtil.doMajorCompaction(conn, tableName); + } + + @Test + @Ignore //TODO remove this once https://issues.apache.org/jira/browse/TEPHRA-208 is fixed + public void testCompactUpdatesStats() throws Exception { + testCompactUpdatesStats(0, fullTableName); + } + + @Test + @Ignore //TODO remove this once https://issues.apache.org/jira/browse/TEPHRA-208 is fixed + public void testCompactUpdatesStatsWithMinStatsUpdateFreq() throws Exception { + testCompactUpdatesStats(QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS, fullTableName); + } + + private static void invalidateStats(Connection conn, String tableName) throws SQLException { + PTable ptable = conn.unwrap(PhoenixConnection.class) + .getMetaDataCache().getTableRef(new PTableKey(null, tableName)) + .getTable(); + byte[] name = ptable.getPhysicalName().getBytes(); + conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(new GuidePostsKey(name, SchemaUtil.getEmptyColumnFamily(ptable))); + } + + private void testCompactUpdatesStats(Integer statsUpdateFreq, String tableName) throws Exception { + int nRows = 10; + Connection conn = getConnection(statsUpdateFreq); + PreparedStatement stmt; + conn.createStatement().execute("CREATE TABLE " + tableName + "(k CHAR(1) PRIMARY KEY, v INTEGER, w INTEGER) " + + (!tableDDLOptions.isEmpty() ? tableDDLOptions + "," : "") + + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); + for (int i = 0; i < nRows; i++) { + stmt.setString(1, Character.toString((char) ('a' + i))); + stmt.setInt(2, i); + stmt.setInt(3, i); + stmt.executeUpdate(); + } + conn.commit(); + + compactTable(conn, physicalTableName); + + if (statsUpdateFreq != 0) { + invalidateStats(conn, tableName); + } else { + // Confirm that when we have a non zero STATS_UPDATE_FREQ_MS_ATTRIB, after we run + // UPDATATE STATISTICS, the new statistics are faulted in as expected. + List<KeyRange>keyRanges = getAllSplits(conn, tableName); + assertNotEquals(nRows+1, keyRanges.size()); + // If we've set MIN_STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE STATISTICS will invalidate the cache + // and forcing the new stats to be pulled over. + int rowCount = conn.createStatement().executeUpdate("UPDATE STATISTICS " + tableName); + assertEquals(10, rowCount); + } + List<KeyRange>keyRanges = getAllSplits(conn, tableName); + assertEquals(nRows+1, keyRanges.size()); + + int nDeletedRows = conn.createStatement().executeUpdate("DELETE FROM " + tableName + " WHERE V < " + nRows / 2); + conn.commit(); + assertEquals(5, nDeletedRows); + + Scan scan = new Scan(); + scan.setRaw(true); + PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); + try (HTableInterface htable = phxConn.getQueryServices().getTable(Bytes.toBytes(tableName))) { + ResultScanner scanner = htable.getScanner(scan); + Result result; + while ((result = scanner.next())!=null) { + System.out.println(result); + } + } + + compactTable(conn, physicalTableName); + + scan = new Scan(); + scan.setRaw(true); + phxConn = conn.unwrap(PhoenixConnection.class); + try (HTableInterface htable = phxConn.getQueryServices().getTable(Bytes.toBytes(tableName))) { + ResultScanner scanner = htable.getScanner(scan); + Result result; + while ((result = scanner.next())!=null) { + System.out.println(result); + } + } + + if (statsUpdateFreq != 0) { + invalidateStats(conn, tableName); + } else { + assertEquals(nRows+1, keyRanges.size()); + // If we've set STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE STATISTICS will invalidate the cache + // and force us to pull over the new stats + int rowCount = conn.createStatement().executeUpdate("UPDATE STATISTICS " + tableName); + assertEquals(5, rowCount); + } + keyRanges = getAllSplits(conn, tableName); + assertEquals(nRows/2+1, keyRanges.size()); + ResultSet rs = conn.createStatement().executeQuery("SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM " + + "\""+ SYSTEM_CATALOG_SCHEMA + "\".\"" + SYSTEM_STATS_TABLE + "\"" + " WHERE PHYSICAL_NAME='" + physicalTableName + "'"); + rs.next(); + assertEquals(nRows - nDeletedRows, rs.getLong(1)); + } + + @Test + public void testWithMultiCF() throws Exception { + int nRows = 20; + Connection conn = getConnection(0); + PreparedStatement stmt; + conn.createStatement().execute( + "CREATE TABLE " + fullTableName + + "(k VARCHAR PRIMARY KEY, a.v INTEGER, b.v INTEGER, c.v INTEGER NULL, d.v INTEGER NULL) " + + tableDDLOptions ); + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?, ?, ?, ?)"); + byte[] val = new byte[250]; + for (int i = 0; i < nRows; i++) { + stmt.setString(1, Character.toString((char)('a' + i)) + Bytes.toString(val)); + stmt.setInt(2, i); + stmt.setInt(3, i); + stmt.setInt(4, i); + stmt.setInt(5, i); + stmt.executeUpdate(); + } + conn.commit(); + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k, c.v, d.v) VALUES(?,?,?)"); + for (int i = 0; i < 5; i++) { + stmt.setString(1, Character.toString((char)('a' + 'z' + i)) + Bytes.toString(val)); + stmt.setInt(2, i); + stmt.setInt(3, i); + stmt.executeUpdate(); + } + conn.commit(); + + ResultSet rs; + TestUtil.analyzeTable(conn, fullTableName); + List<KeyRange> keyRanges = getAllSplits(conn, fullTableName); + assertEquals(26, keyRanges.size()); + rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName); + assertEquals("CLIENT 26-CHUNK 25 ROWS " + (columnEncoded ? ( mutable ? "12530" : "13902" ) : "12420") + " BYTES PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName, + QueryUtil.getExplainPlan(rs)); + + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + List<HRegionLocation> regions = services.getAllTableRegions(Bytes.toBytes(physicalTableName)); + assertEquals(1, regions.size()); + + TestUtil.analyzeTable(conn, fullTableName); + String query = "UPDATE STATISTICS " + fullTableName + " SET \"" + + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + "\"=" + Long.toString(1000); + conn.createStatement().execute(query); + keyRanges = getAllSplits(conn, fullTableName); + boolean oneCellPerColFamliyStorageScheme = !mutable && columnEncoded; + assertEquals(oneCellPerColFamliyStorageScheme ? 13 : 12, keyRanges.size()); + + rs = conn + .createStatement() + .executeQuery( + "SELECT COLUMN_FAMILY,SUM(GUIDE_POSTS_ROW_COUNT),SUM(GUIDE_POSTS_WIDTH),COUNT(*) from \"SYSTEM\".STATS where PHYSICAL_NAME = '" + + physicalTableName + "' GROUP BY COLUMN_FAMILY ORDER BY COLUMN_FAMILY"); + + assertTrue(rs.next()); + assertEquals("A", rs.getString(1)); + assertEquals(24, rs.getInt(2)); + assertEquals(columnEncoded ? ( mutable ? 12252 : 13624 ) : 12144, rs.getInt(3)); + assertEquals(oneCellPerColFamliyStorageScheme ? 12 : 11, rs.getInt(4)); + + assertTrue(rs.next()); + assertEquals("B", rs.getString(1)); + assertEquals(oneCellPerColFamliyStorageScheme ? 24 : 20, rs.getInt(2)); + assertEquals(columnEncoded ? ( mutable ? 5600 : 6972 ) : 5540, rs.getInt(3)); + assertEquals(oneCellPerColFamliyStorageScheme ? 6 : 5, rs.getInt(4)); + + assertTrue(rs.next()); + assertEquals("C", rs.getString(1)); + assertEquals(24, rs.getInt(2)); + assertEquals(columnEncoded ? ( mutable ? 6724 : 6988 ) : 6652, rs.getInt(3)); + assertEquals(6, rs.getInt(4)); + + assertTrue(rs.next()); + assertEquals("D", rs.getString(1)); + assertEquals(24, rs.getInt(2)); + assertEquals(columnEncoded ? ( mutable ? 6724 : 6988 ) : 6652, rs.getInt(3)); + assertEquals(6, rs.getInt(4)); + + assertFalse(rs.next()); + + // Disable stats + conn.createStatement().execute("ALTER TABLE " + fullTableName + + " SET " + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=0"); + TestUtil.analyzeTable(conn, fullTableName); + // Assert that there are no more guideposts + rs = conn.createStatement().executeQuery("SELECT count(1) FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + + " WHERE " + PhoenixDatabaseMetaData.PHYSICAL_NAME + "='" + physicalTableName + "' AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NOT NULL"); + assertTrue(rs.next()); + assertEquals(0, rs.getLong(1)); + assertFalse(rs.next()); + rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName); + assertEquals("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER " + physicalTableName, + QueryUtil.getExplainPlan(rs)); + } + + @Test + public void testRowCountAndByteCounts() throws SQLException { + Connection conn = getConnection(); + String ddl = "CREATE TABLE " + fullTableName + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + tableDDLOptions + " split on ('e','j','o')"; + conn.createStatement().execute(ddl); + String[] strings = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", + "s", "t", "u", "v", "w", "x", "y", "z" }; + for (int i = 0; i < 26; i++) { + conn.createStatement().execute( + "UPSERT INTO " + fullTableName + " values('" + strings[i] + "'," + i + "," + (i + 1) + "," + + (i + 2) + ",'" + strings[25 - i] + "')"); + } + conn.commit(); + ResultSet rs; + String query = "UPDATE STATISTICS " + fullTableName + " SET \"" + + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + "\"=" + Long.toString(20); + conn.createStatement().execute(query); + Random r = new Random(); + int count = 0; + while (count < 4) { + int startIndex = r.nextInt(strings.length); + int endIndex = r.nextInt(strings.length - startIndex) + startIndex; + long rows = endIndex - startIndex; + long c2Bytes = rows * (columnEncoded ? ( mutable ? 37 : 48 ) : 35); + String physicalTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(fullTableName), userTableNamespaceMapped).toString(); + rs = conn.createStatement().executeQuery( + "SELECT COLUMN_FAMILY,SUM(GUIDE_POSTS_ROW_COUNT),SUM(GUIDE_POSTS_WIDTH) from \"SYSTEM\".STATS where PHYSICAL_NAME = '" + + physicalTableName + "' AND GUIDE_POST_KEY>= cast('" + strings[startIndex] + + "' as varbinary) AND GUIDE_POST_KEY<cast('" + strings[endIndex] + + "' as varbinary) and COLUMN_FAMILY='C2' group by COLUMN_FAMILY"); + if (startIndex < endIndex) { + assertTrue(rs.next()); + assertEquals("C2", rs.getString(1)); + assertEquals(rows, rs.getLong(2)); + assertEquals(c2Bytes, rs.getLong(3)); + count++; + } + } + } + + @Test + public void testRowCountWhenNumKVsExceedCompactionScannerThreshold() throws Exception { + String tableName = generateUniqueName(); + StringBuilder sb = new StringBuilder(200); + sb.append("CREATE TABLE " + tableName + "(PK1 VARCHAR NOT NULL, "); + int numRows = 10; + try (Connection conn = DriverManager.getConnection(getUrl())) { + int compactionScannerKVThreshold = + conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration() + .getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + int numKvColumns = compactionScannerKVThreshold * 2; + for (int i = 1; i <= numKvColumns; i++) { + sb.append("KV" + i + " VARCHAR"); + if (i < numKvColumns) { + sb.append(", "); + } + } + sb.append(" CONSTRAINT PK PRIMARY KEY (PK1))"); + String ddl = sb.toString(); + conn.createStatement().execute(ddl); + sb = new StringBuilder(200); + sb.append("UPSERT INTO " + tableName + " VALUES ("); + for (int i = 1; i <= numKvColumns + 1; i++) { + sb.append("?"); + if (i < numKvColumns + 1) { + sb.append(", "); + } + } + sb.append(")"); + String dml = sb.toString(); + PreparedStatement stmt = conn.prepareStatement(dml); + String keyValue = "KVVVVVV"; + for (int j = 1; j <= numRows; j++) { + for (int i = 1; i <= numKvColumns + 1; i++) { + if (i == 1) { + stmt.setString(1, "" + j); + } else { + stmt.setString(i, keyValue); + } + } + stmt.executeUpdate(); + } + conn.commit(); + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + String q = "SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'"; + ResultSet rs = conn.createStatement().executeQuery(q); + rs.next(); + assertEquals("Number of expected rows in stats table after update stats didn't match!", numRows, rs.getInt(1)); + conn.createStatement().executeUpdate("DELETE FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'"); + conn.commit(); + TestUtil.doMajorCompaction(conn, tableName); + q = "SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'"; + rs = conn.createStatement().executeQuery(q); + rs.next(); + assertEquals("Number of expected rows in stats table after major compaction didn't match", numRows, rs.getInt(1)); + } + } + + @Test + public void testEmptyGuidePostGeneratedWhenDataSizeLessThanGPWidth() throws Exception { + String tableName = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + long guidePostWidth = 20000000; + conn.createStatement() + .execute("CREATE TABLE " + tableName + + " ( k INTEGER, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k)) GUIDE_POSTS_WIDTH=" + + guidePostWidth + ", SALT_BUCKETS = 4"); + conn.createStatement().execute("upsert into " + tableName + " values (100,1,3)"); + conn.createStatement().execute("upsert into " + tableName + " values (101,2,4)"); + conn.commit(); + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + ConnectionQueryServices queryServices = + conn.unwrap(PhoenixConnection.class).getQueryServices(); + try (HTableInterface statsHTable = + queryServices.getTable( + SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, + queryServices.getProps()).getName())) { + GuidePostsInfo gps = + StatisticsUtil.readStatistics(statsHTable, + new GuidePostsKey(Bytes.toBytes(tableName), Bytes.toBytes("C1")), + HConstants.LATEST_TIMESTAMP); + assertTrue(gps.isEmptyGuidePost()); + assertEquals(guidePostWidth, gps.getByteCounts()[0]); + assertTrue(gps.getGuidePostTimestamps()[0] > 0); + gps = + StatisticsUtil.readStatistics(statsHTable, + new GuidePostsKey(Bytes.toBytes(tableName), Bytes.toBytes("C2")), + HConstants.LATEST_TIMESTAMP); + assertTrue(gps.isEmptyGuidePost()); + assertEquals(guidePostWidth, gps.getByteCounts()[0]); + assertTrue(gps.getGuidePostTimestamps()[0] > 0); + } + } + } + + @Test + public void testGuidePostWidthUsedInDefaultStatsCollector() throws Exception { + String baseTable = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String ddl = + "CREATE TABLE " + baseTable + + " (k INTEGER PRIMARY KEY, a bigint, b bigint, c bigint) " + + tableDDLOptions; + BaseTest.createTestTable(getUrl(), ddl, null, null); + conn.createStatement().execute("upsert into " + baseTable + " values (100,1,1,1)"); + conn.createStatement().execute("upsert into " + baseTable + " values (101,2,2,2)"); + conn.createStatement().execute("upsert into " + baseTable + " values (102,3,3,3)"); + conn.createStatement().execute("upsert into " + baseTable + " values (103,4,4,4)"); + conn.createStatement().execute("upsert into " + baseTable + " values (104,5,5,5)"); + conn.createStatement().execute("upsert into " + baseTable + " values (105,6,6,6)"); + conn.createStatement().execute("upsert into " + baseTable + " values (106,7,7,7)"); + conn.createStatement().execute("upsert into " + baseTable + " values (107,8,8,8)"); + conn.createStatement().execute("upsert into " + baseTable + " values (108,9,9,9)"); + conn.createStatement().execute("upsert into " + baseTable + " values (109,10,10,10)"); + conn.commit(); + DefaultStatisticsCollector statsCollector = getDefaultStatsCollectorForTable(baseTable); + statsCollector.init(); + assertEquals(defaultGuidePostWidth, statsCollector.getGuidePostDepth()); + + // ok let's create a global index now and see what guide post width is used for it + String globalIndex = "GI_" + generateUniqueName(); + ddl = "CREATE INDEX " + globalIndex + " ON " + baseTable + " (a) INCLUDE (b) "; + conn.createStatement().execute(ddl); + statsCollector = getDefaultStatsCollectorForTable(globalIndex); + statsCollector.init(); + assertEquals(defaultGuidePostWidth, statsCollector.getGuidePostDepth()); + + // let's check out local index too + String localIndex = "LI_" + generateUniqueName(); + ddl = "CREATE LOCAL INDEX " + localIndex + " ON " + baseTable + " (b) INCLUDE (c) "; + conn.createStatement().execute(ddl); + // local indexes reside on the same table as base data table + statsCollector = getDefaultStatsCollectorForTable(baseTable); + statsCollector.init(); + assertEquals(defaultGuidePostWidth, statsCollector.getGuidePostDepth()); + + // now let's create a view and an index on it and see what guide post width is used for + // it + String view = "V_" + generateUniqueName(); + ddl = "CREATE VIEW " + view + " AS SELECT * FROM " + baseTable; + conn.createStatement().execute(ddl); + String viewIndex = "VI_" + generateUniqueName(); + ddl = "CREATE INDEX " + viewIndex + " ON " + view + " (b)"; + conn.createStatement().execute(ddl); + String viewIndexTableName = MetaDataUtil.getViewIndexTableName(baseTable); + statsCollector = getDefaultStatsCollectorForTable(viewIndexTableName); + statsCollector.init(); + assertEquals(defaultGuidePostWidth, statsCollector.getGuidePostDepth()); + /* + * Fantastic! Now let's change the guide post width of the base table. This should + * change the guide post width we are using in DefaultStatisticsCollector for all + * indexes too. + */ + long newGpWidth = 500; + conn.createStatement() + .execute("ALTER TABLE " + baseTable + " SET GUIDE_POSTS_WIDTH=" + newGpWidth); + + // base table + statsCollector = getDefaultStatsCollectorForTable(baseTable); + statsCollector.init(); + assertEquals(newGpWidth, statsCollector.getGuidePostDepth()); + + // global index table + statsCollector = getDefaultStatsCollectorForTable(globalIndex); + statsCollector.init(); + assertEquals(newGpWidth, statsCollector.getGuidePostDepth()); + + // view index table + statsCollector = getDefaultStatsCollectorForTable(viewIndexTableName); + statsCollector.init(); + assertEquals(newGpWidth, statsCollector.getGuidePostDepth()); + } + } + + private DefaultStatisticsCollector getDefaultStatsCollectorForTable(String tableName) + throws Exception { + RegionCoprocessorEnvironment env = getRegionEnvrionment(tableName); + return (DefaultStatisticsCollector) StatisticsCollectorFactory + .createStatisticsCollector(env, tableName, System.currentTimeMillis(), null, null); + } + + private RegionCoprocessorEnvironment getRegionEnvrionment(String tableName) + throws IOException, InterruptedException { + return (RegionCoprocessorEnvironment) getUtility() + .getRSForFirstRegionInTable(TableName.valueOf(tableName)) + .getOnlineRegions(TableName.valueOf(tableName)).get(0).getCoprocessorHost() + .findCoprocessorEnvironment(UngroupedAggregateRegionObserver.class.getName()); + } +}