PHOENIX-1251 Salted queries with range scan become full table scans Conflicts: phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
Conflicts: phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/88c6abb0 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/88c6abb0 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/88c6abb0 Branch: refs/heads/3.0 Commit: 88c6abb038d83a261be4a7fdc5388a20a8513a23 Parents: ff47a95 Author: James Taylor <jtay...@salesforce.com> Authored: Wed Oct 1 08:49:04 2014 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Fri Oct 3 18:20:54 2014 -0700 ---------------------------------------------------------------------- .../BaseParallelIteratorsRegionSplitterIT.java | 90 ---- .../end2end/BaseTenantSpecificViewIndexIT.java | 7 +- .../org/apache/phoenix/end2end/BaseViewIT.java | 5 +- ...efaultParallelIteratorsRegionSplitterIT.java | 163 ------- .../org/apache/phoenix/end2end/DeleteIT.java | 1 + .../phoenix/end2end/GuidePostsLifeCycleIT.java | 168 ------- .../org/apache/phoenix/end2end/InListIT.java | 5 +- .../org/apache/phoenix/end2end/KeyOnlyIT.java | 57 +-- .../phoenix/end2end/MultiCfQueryExecIT.java | 73 +-- .../phoenix/end2end/ParallelIteratorsIT.java | 172 +++++++ .../org/apache/phoenix/end2end/QueryPlanIT.java | 202 -------- ...ipRangeParallelIteratorRegionSplitterIT.java | 109 ++++- .../end2end/SkipScanAfterManualSplitIT.java | 30 +- .../apache/phoenix/end2end/StatsManagerIT.java | 198 -------- .../end2end/TenantSpecificTablesDMLIT.java | 55 +-- .../phoenix/end2end/index/SaltedIndexIT.java | 4 +- .../apache/phoenix/cache/ServerCacheClient.java | 12 +- .../org/apache/phoenix/compile/QueryPlan.java | 2 + .../org/apache/phoenix/compile/ScanRanges.java | 370 ++++++++++++--- .../phoenix/compile/StatementContext.java | 46 +- .../apache/phoenix/compile/WhereOptimizer.java | 39 +- .../coprocessor/MetaDataEndpointImpl.java | 85 ++-- .../apache/phoenix/execute/AggregatePlan.java | 4 +- .../apache/phoenix/execute/BaseQueryPlan.java | 216 +++++++++ .../apache/phoenix/execute/BasicQueryPlan.java | 211 --------- .../phoenix/execute/DegenerateQueryPlan.java | 2 +- .../apache/phoenix/execute/HashJoinPlan.java | 13 +- .../org/apache/phoenix/execute/ScanPlan.java | 6 +- .../apache/phoenix/filter/SkipScanFilter.java | 29 +- .../phoenix/index/PhoenixIndexBuilder.java | 1 - .../DefaultParallelIteratorRegionSplitter.java | 174 ------- .../apache/phoenix/iterate/ExplainTable.java | 9 +- .../ParallelIteratorRegionSplitterFactory.java | 38 -- .../phoenix/iterate/ParallelIterators.java | 456 ++++++++++++++----- ...SkipRangeParallelIteratorRegionSplitter.java | 83 ---- .../apache/phoenix/jdbc/PhoenixStatement.java | 5 + .../java/org/apache/phoenix/query/KeyRange.java | 8 + .../org/apache/phoenix/query/StatsManager.java | 59 --- .../apache/phoenix/query/StatsManagerImpl.java | 214 --------- .../schema/stat/StatisticsCollector.java | 47 +- .../phoenix/schema/stat/StatisticsTable.java | 18 +- .../java/org/apache/phoenix/util/ScanUtil.java | 66 ++- .../compile/ScanRangesIntersectTest.java | 105 +++++ .../apache/phoenix/compile/ScanRangesTest.java | 2 +- .../phoenix/compile/WhereCompilerTest.java | 13 +- .../phoenix/compile/WhereOptimizerTest.java | 5 +- .../query/BaseConnectionlessQueryTest.java | 3 +- .../org/apache/phoenix/query/QueryPlanTest.java | 179 ++++++++ .../java/org/apache/phoenix/util/TestUtil.java | 41 ++ .../phoenix/pig/hadoop/PhoenixInputFormat.java | 25 +- 50 files changed, 1749 insertions(+), 2176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseParallelIteratorsRegionSplitterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseParallelIteratorsRegionSplitterIT.java deleted file mode 100644 index 514b36e..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseParallelIteratorsRegionSplitterIT.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end; - -import static org.apache.phoenix.util.TestUtil.STABLE_NAME; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Map; -import java.util.Properties; - -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.BeforeClass; -import org.junit.experimental.categories.Category; - -import com.google.common.collect.Maps; - -@Category(ClientManagedTimeTest.class) -public class BaseParallelIteratorsRegionSplitterIT extends BaseClientManagedTimeIT { - - protected static final byte[] KMIN = new byte[] {'!'}; - protected static final byte[] KMIN2 = new byte[] {'.'}; - protected static final byte[] K1 = new byte[] {'a'}; - protected static final byte[] K3 = new byte[] {'c'}; - protected static final byte[] K4 = new byte[] {'d'}; - protected static final byte[] K5 = new byte[] {'e'}; - protected static final byte[] K6 = new byte[] {'f'}; - protected static final byte[] K9 = new byte[] {'i'}; - protected static final byte[] K11 = new byte[] {'k'}; - protected static final byte[] K12 = new byte[] {'l'}; - protected static final byte[] KMAX = new byte[] {'~'}; - protected static final byte[] KMAX2 = new byte[] {'z'}; - - @BeforeClass - @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) - public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(3); - // Must update config before starting server - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } - - protected void initTableValues(long ts) throws Exception { - byte[][] splits = new byte[][] {K3,K4,K9,K11}; - ensureTableCreated(getUrl(),STABLE_NAME,splits, ts-2); - String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(url, props); - PreparedStatement stmt = conn.prepareStatement( - "upsert into " + STABLE_NAME + " VALUES (?, ?)"); - stmt.setString(1, new String(KMIN)); - stmt.setInt(2, 1); - stmt.execute(); - stmt.setString(1, new String(KMAX)); - stmt.setInt(2, 2); - stmt.execute(); - conn.commit(); - conn.close(); - } - - protected static TableRef getTableRef(Connection conn, long ts) throws SQLException { - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable( - new PTableKey(pconn.getTenantId(), STABLE_NAME)), ts, false); - return table; - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java index cda44c5..c6a6a7e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java @@ -102,10 +102,11 @@ public class BaseTenantSpecificViewIndexIT extends BaseHBaseManagedTimeIT { conn.createStatement().execute("UPSERT INTO v(k2,v1,v2) VALUES (-1, 'blah', 'superblah')"); // sanity check that we can upsert after index is there conn.commit(); ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT k1, k2, v2 FROM v WHERE v2='" + valuePrefix + "v2-1'"); - assertEquals(saltBuckets == null ? + String expected = saltBuckets == null ? "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _IDX_T ['" + tenantId + "',-32768,'" + valuePrefix + "v2-1']" : - "CLIENT PARALLEL 3-WAY SKIP SCAN ON 3 KEYS OVER _IDX_T [0,'" + tenantId + "',-32768,'" + valuePrefix + "v2-1'] - [2,'" + tenantId + "',-32768,'" + valuePrefix + "v2-1']\n" + - "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); + "CLIENT PARALLEL 3-WAY RANGE SCAN OVER _IDX_T [0,'" + tenantId + "',-32768,'" + valuePrefix + "v2-1']\n" + + "CLIENT MERGE SORT"; + assertEquals(expected, QueryUtil.getExplainPlan(rs)); } private Connection createTenantConnection(String tenantId) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java index 0a5c197..e68c82e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java @@ -110,9 +110,10 @@ public class BaseViewIT extends BaseHBaseManagedTimeIT { assertEquals("bar", rs.getString(4)); assertFalse(rs.next()); rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals(saltBuckets == null ? "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _IDX_T [" + Short.MIN_VALUE + ",51]" - : "CLIENT PARALLEL " + saltBuckets + "-WAY SKIP SCAN ON 3 KEYS OVER _IDX_T [0," + Short.MIN_VALUE + ",51] - [2," + Short.MIN_VALUE + ",51]\nCLIENT MERGE SORT", + : "CLIENT PARALLEL " + saltBuckets + "-WAY RANGE SCAN OVER _IDX_T [0," + Short.MIN_VALUE + ",51]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); conn.createStatement().execute("CREATE INDEX i2 on v(s)"); @@ -126,7 +127,7 @@ public class BaseViewIT extends BaseHBaseManagedTimeIT { rs = conn.createStatement().executeQuery("EXPLAIN " + query); assertEquals(saltBuckets == null ? "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _IDX_T [" + (Short.MIN_VALUE+1) + ",'foo']" - : "CLIENT PARALLEL " + saltBuckets + "-WAY SKIP SCAN ON 3 KEYS OVER _IDX_T [0," + (Short.MIN_VALUE+1) + ",'foo'] - [2," + (Short.MIN_VALUE+1) + ",'foo']\nCLIENT MERGE SORT", + : "CLIENT PARALLEL " + saltBuckets + "-WAY RANGE SCAN OVER _IDX_T [0," + (Short.MIN_VALUE+1) + ",'foo']\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java deleted file mode 100644 index e7a1044..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end; - -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.compile.SequenceManager; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.parse.HintNode; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.schema.PDataType; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.google.common.collect.Maps; - - -/** - * Tests for {@link DefaultParallelIteratorRegionSplitter}. - * - * - * @since 0.1 - */ - -@Category(ClientManagedTimeTest.class) -public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIteratorsRegionSplitterIT { - - @BeforeClass - @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) - public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(3); - // Must update config before starting server - props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } - private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan) - throws SQLException { - TableRef tableRef = getTableRef(conn, ts); - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes()); - PhoenixStatement statement = new PhoenixStatement(pconn); - StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); - DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), HintNode.EMPTY_HINT_NODE) { - @Override - protected List<HRegionLocation> getAllRegions() throws SQLException { - return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(), scan.getStopRow()); - } - }; - List<KeyRange> keyRanges = splitter.getSplits(); - Collections.sort(keyRanges, new Comparator<KeyRange>() { - @Override - public int compare(KeyRange o1, KeyRange o2) { - return Bytes.compareTo(o1.getLowerRange(),o2.getLowerRange()); - } - }); - return keyRanges; - } - - @Test - public void testGetSplits() throws Exception { - long ts = nextTimestamp(); - initTableValues(ts); - String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts + 2; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(url, props); - PreparedStatement stmt = conn.prepareStatement("ANALYZE STABLE"); - stmt.execute(); - Scan scan = new Scan(); - - // number of regions > target query concurrency - conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); - scan.setStartRow(K1); - scan.setStopRow(K12); - List<KeyRange> keyRanges = getSplits(conn, ts, scan); - assertEquals("Unexpected number of splits: " + keyRanges, 7, keyRanges.size()); - assertEquals(newKeyRange(KeyRange.UNBOUND, KMIN), keyRanges.get(0)); - assertEquals(newKeyRange(KMIN, K3), keyRanges.get(1)); - assertEquals(newKeyRange(K3, K4), keyRanges.get(2)); - assertEquals(newKeyRange(K4, K9), keyRanges.get(3)); - assertEquals(newKeyRange(K9, K11), keyRanges.get(4)); - assertEquals(newKeyRange(K11, KMAX), keyRanges.get(5)); - assertEquals(newKeyRange(KMAX, KeyRange.UNBOUND), keyRanges.get(6)); - - scan.setStartRow(K3); - scan.setStopRow(K6); - keyRanges = getSplits(conn, ts, scan); - assertEquals("Unexpected number of splits: " + keyRanges, 2, keyRanges.size()); - // note that we get a single split from R2 due to small key space - assertEquals(newKeyRange(K3, K4), keyRanges.get(0)); - assertEquals(newKeyRange(K4, K9), keyRanges.get(1)); - - scan.setStartRow(K5); - scan.setStopRow(K6); - keyRanges = getSplits(conn, ts, scan); - assertEquals("Unexpected number of splits: " + keyRanges, 1, keyRanges.size()); - assertEquals(newKeyRange(K4, K9), keyRanges.get(0)); - conn.close(); - } - - @Test - public void testGetLowerUnboundSplits() throws Throwable { - long ts = nextTimestamp(); - initTableValues(ts); - String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(url, props); - PreparedStatement stmt = conn.prepareStatement("ANALYZE STABLE"); - stmt.execute(); - // The query would use all the split points here - conn.createStatement().executeQuery("SELECT * FROM STABLE"); - conn.close(); - Scan scan = new Scan(); - scan.setStartRow(HConstants.EMPTY_START_ROW); - scan.setStopRow(K1); - List<KeyRange> keyRanges = getSplits(conn, ts, scan); - assertEquals("Unexpected number of splits: " + keyRanges, 2, keyRanges.size()); - assertEquals(newKeyRange(KeyRange.UNBOUND, KMIN), keyRanges.get(0)); - assertEquals(newKeyRange(KMIN, K3), keyRanges.get(1)); - } - - private static KeyRange newKeyRange(byte[] lowerRange, byte[] upperRange) { - return PDataType.CHAR.getKeyRange(lowerRange, true, upperRange, false); - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java index 4d41141..a2dd942 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java @@ -235,6 +235,7 @@ public class DeleteIT extends BaseHBaseManagedTimeIT { testDeleteAllFromTableWithIndex(true, false); } + //@Ignore // TODO: JT to look at: SkipScanFilter:151 assert for skip_hint > current_key is failing @Test public void testDeleteAllFromTableWithIndexNoAutoCommitNoSalting() throws SQLException { testDeleteAllFromTableWithIndex(false,false); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java deleted file mode 100644 index ba9f961..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end; - -import static org.apache.phoenix.util.TestUtil.STABLE_NAME; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.compile.SequenceManager; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.parse.HintNode; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.google.common.collect.Maps; - -@Category(HBaseManagedTimeTest.class) -public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT { - - @BeforeClass - @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) - public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(3); - // Must update config before starting server - props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); - props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20)); - props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20)); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } - - protected static final byte[] KMIN = new byte[] {'!'}; - protected static final byte[] KMIN2 = new byte[] {'.'}; - protected static final byte[] K1 = new byte[] {'a'}; - protected static final byte[] K3 = new byte[] {'c'}; - protected static final byte[] K4 = new byte[] {'d'}; - protected static final byte[] K5 = new byte[] {'e'}; - protected static final byte[] K6 = new byte[] {'f'}; - protected static final byte[] K9 = new byte[] {'i'}; - protected static final byte[] K11 = new byte[] {'k'}; - protected static final byte[] K12 = new byte[] {'l'}; - protected static final byte[] KMAX = new byte[] {'~'}; - protected static final byte[] KMAX2 = new byte[] {'z'}; - protected static final byte[] KR = new byte[] { 'r' }; - protected static final byte[] KP = new byte[] { 'p' }; - - private static List<KeyRange> getSplits(Connection conn, final Scan scan) throws SQLException { - TableRef tableRef = getTableRef(conn); - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions( - tableRef.getTable().getPhysicalName().getBytes()); - PhoenixStatement statement = new PhoenixStatement(pconn); - StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); - DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), - HintNode.EMPTY_HINT_NODE) { - @Override - protected List<HRegionLocation> getAllRegions() throws SQLException { - return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(), - scan.getStopRow()); - } - }; - List<KeyRange> keyRanges = splitter.getSplits(); - Collections.sort(keyRanges, new Comparator<KeyRange>() { - @Override - public int compare(KeyRange o1, KeyRange o2) { - return Bytes.compareTo(o1.getLowerRange(), o2.getLowerRange()); - } - }); - return keyRanges; - } - - // This test ensures that as we keep adding new records the splits gets updated - @Test - public void testGuidePostsLifeCycle() throws Exception { - byte[][] splits = new byte[][] { K3, K9, KR }; - ensureTableCreated(getUrl(), STABLE_NAME, splits); - String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(url, props); - PreparedStatement stmt = conn.prepareStatement("ANALYZE STABLE"); - stmt.execute(); - Scan scan = new Scan(); - List<KeyRange> keyRanges = getSplits(conn, scan); - assertEquals(4, keyRanges.size()); - upsert(new byte[][] { KMIN, K4, K11 }); - stmt = conn.prepareStatement("ANALYZE STABLE"); - stmt.execute(); - conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); - keyRanges = getSplits(conn, scan); - assertEquals(7, keyRanges.size()); - upsert(new byte[][] { KMIN2, K5, K12 }); - stmt = conn.prepareStatement("ANALYZE STABLE"); - stmt.execute(); - conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); - keyRanges = getSplits(conn, scan); - assertEquals(10, keyRanges.size()); - upsert(new byte[][] { K1, K6, KP }); - stmt = conn.prepareStatement("ANALYZE STABLE"); - stmt.execute(); - conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); - keyRanges = getSplits(conn, scan); - assertEquals(13, keyRanges.size()); - conn.close(); - } - - protected void upsert( byte[][] val) throws Exception { - String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(url, props); - PreparedStatement stmt = conn.prepareStatement("upsert into " + STABLE_NAME + " VALUES (?, ?)"); - stmt.setString(1, new String(val[0])); - stmt.setInt(2, 1); - stmt.execute(); - stmt.setString(1, new String(val[1])); - stmt.setInt(2, 2); - stmt.execute(); - stmt.setString(1, new String(val[2])); - stmt.setInt(2, 3); - stmt.execute(); - conn.commit(); - conn.close(); - } - - protected static TableRef getTableRef(Connection conn) throws SQLException { - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable( - new PTableKey(pconn.getTenantId(), STABLE_NAME)), System.currentTimeMillis(), false); - return table; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java index dc60b69..920891b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java @@ -165,10 +165,7 @@ public class InListIT extends BaseHBaseManagedTimeIT { private static final List<PDataType> INTEGER_TYPES = Arrays.asList(PDataType.INTEGER, PDataType.LONG); private static final List<Integer> SALT_BUCKET_NUMBERS = Arrays.asList(0, 4); - // we should be including the RANGE_SCAN hint here, but a bug with ParallelIterators causes tests to fail - // see the relevant JIRA here: https://issues.apache.org/jira/browse/PHOENIX-1251 - private static final List<String> HINTS = Arrays.asList("", "/*+ SKIP_SCAN */"); -// private static final List<String> HINTS = Arrays.asList("", "/*+ SKIP_SCAN */", "/*+ RANGE_SCAN */"); + private static final List<String> HINTS = Arrays.asList("", "/*+ SKIP_SCAN */", "/*+ RANGE_SCAN */"); /** * Tests the given where clause against the given upserts by comparing against the list of http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java index ed081d9..4dee5d8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java @@ -19,35 +19,22 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.util.TestUtil.KEYONLY_NAME; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.apache.phoenix.util.TestUtil.analyzeTable; +import static org.apache.phoenix.util.TestUtil.getAllSplits; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Properties; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.compile.SequenceManager; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -88,8 +75,7 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT { assertEquals(3, rs.getInt(1)); assertEquals(4, rs.getInt(2)); assertFalse(rs.next()); - Scan scan = new Scan(); - List<KeyRange> splits = getSplits(conn5, ts, scan); + List<KeyRange> splits = getAllSplits(conn5, "KEYONLY"); assertEquals(3, splits.size()); conn5.close(); @@ -180,41 +166,4 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT { conn.commit(); conn.close(); } - - private void analyzeTable(Connection conn, String tableName) throws IOException, SQLException { - String query = "ANALYZE " + tableName; - conn.createStatement().execute(query); - } - - private static TableRef getTableRef(Connection conn, long ts) throws SQLException { - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable( - new PTableKey(pconn.getTenantId(), KEYONLY_NAME)), ts, false); - return table; - } - - private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan) throws SQLException { - TableRef tableRef = getTableRef(conn, ts); - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions( - tableRef.getTable().getPhysicalName().getBytes()); - PhoenixStatement statement = new PhoenixStatement(pconn); - StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); - DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), - HintNode.EMPTY_HINT_NODE) { - @Override - protected List<HRegionLocation> getAllRegions() throws SQLException { - return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(), - scan.getStopRow()); - } - }; - List<KeyRange> keyRanges = splitter.getSplits(); - Collections.sort(keyRanges, new Comparator<KeyRange>() { - @Override - public int compare(KeyRange o1, KeyRange o2) { - return Bytes.compareTo(o1.getLowerRange(), o2.getLowerRange()); - } - }); - return keyRanges; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java index fbd1cf6..9f313ae 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java @@ -18,36 +18,23 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.apache.phoenix.util.TestUtil.analyzeTable; +import static org.apache.phoenix.util.TestUtil.getAllSplits; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.IOException; import java.math.BigDecimal; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Properties; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.compile.SequenceManager; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -106,11 +93,6 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { stmt.execute(); } - private void analyzeTable(Connection conn, String tableName) throws IOException, SQLException { - String query = "ANALYZE " + tableName; - conn.createStatement().execute(query); - } - @Test public void testConstantCount() throws Exception { long ts = nextTimestamp(); @@ -174,14 +156,14 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { } @Test - public void testCFToDisambiguate1() throws Exception { + public void testGuidePostsForMultiCFs() throws Exception { long ts = nextTimestamp(); + initTableValues(ts); String query = "SELECT F.RESPONSE_TIME,G.RESPONSE_TIME from multi_cf where F.RESPONSE_TIME = 2222"; String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 5); // Run query at timestamp 5 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(url, props); try { - initTableValues(ts); analyzeTable(conn, "MULTI_CF"); PreparedStatement statement = conn.prepareStatement(query); ResultSet rs = statement.executeQuery(); @@ -189,16 +171,13 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { assertEquals(2222, rs.getLong(1)); assertEquals(22222, rs.getLong(2)); assertFalse(rs.next()); - Scan scan = new Scan(); - // See if F has splits in it - scan.addFamily(Bytes.toBytes("E")); - List<KeyRange> splits = getSplits(conn, ts, scan); + // Use E column family. Since the column family with the empty key value (the first one, A) + // is always added to the scan, we never really use other guideposts (but this may change). + List<KeyRange> splits = getAllSplits(conn, "MULTI_CF", "e.cpu_utilization IS NOT NULL"); + // Since the E column family is not populated, it won't have as many splits assertEquals(3, splits.size()); - scan = new Scan(); - // See if G has splits in it - scan.addFamily(Bytes.toBytes("G")); - splits = getSplits(conn, ts, scan); - // We get splits from different CF + // Same as above for G column family. + splits = getAllSplits(conn, "MULTI_CF", "g.response_time IS NOT NULL"); assertEquals(3, splits.size()); } finally { conn.close(); @@ -283,36 +262,4 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { conn.close(); } } - - private static TableRef getTableRef(Connection conn, long ts) throws SQLException { - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable( - new PTableKey(pconn.getTenantId(), "MULTI_CF")), ts, false); - return table; - } - - private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan) throws SQLException { - TableRef tableRef = getTableRef(conn, ts); - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions( - tableRef.getTable().getPhysicalName().getBytes()); - PhoenixStatement statement = new PhoenixStatement(pconn); - StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); - DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), - HintNode.EMPTY_HINT_NODE) { - @Override - protected List<HRegionLocation> getAllRegions() throws SQLException { - return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(), - scan.getStopRow()); - } - }; - List<KeyRange> keyRanges = splitter.getSplits(); - Collections.sort(keyRanges, new Comparator<KeyRange>() { - @Override - public int compare(KeyRange o1, KeyRange o2) { - return Bytes.compareTo(o1.getLowerRange(), o2.getLowerRange()); - } - }); - return keyRanges; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java new file mode 100644 index 0000000..97ca828 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.STABLE_NAME; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.apache.phoenix.util.TestUtil.getAllSplits; +import static org.apache.phoenix.util.TestUtil.getSplits; +import static org.junit.Assert.assertEquals; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.List; +import java.util.Map; + +import org.apache.phoenix.jdbc.PhoenixPreparedStatement; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Maps; + +@Category(HBaseManagedTimeTest.class) +public class ParallelIteratorsIT extends BaseHBaseManagedTimeIT { + + protected static final byte[] KMIN = new byte[] {'!'}; + protected static final byte[] KMIN2 = new byte[] {'.'}; + protected static final byte[] K1 = new byte[] {'a'}; + protected static final byte[] K3 = new byte[] {'c'}; + protected static final byte[] K4 = new byte[] {'d'}; + protected static final byte[] K5 = new byte[] {'e'}; + protected static final byte[] K6 = new byte[] {'f'}; + protected static final byte[] K9 = new byte[] {'i'}; + protected static final byte[] K11 = new byte[] {'k'}; + protected static final byte[] K12 = new byte[] {'l'}; + protected static final byte[] KMAX = new byte[] {'~'}; + protected static final byte[] KMAX2 = new byte[] {'z'}; + protected static final byte[] KR = new byte[] { 'r' }; + protected static final byte[] KP = new byte[] { 'p' }; + + @BeforeClass + @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testGetSplits() throws Exception { + Connection conn = DriverManager.getConnection(getUrl(), TEST_PROPERTIES); + initTableValues(conn); + + PreparedStatement stmt = conn.prepareStatement("ANALYZE STABLE"); + stmt.execute(); + + // number of regions > target query concurrency + PhoenixPreparedStatement pstmt; + List<KeyRange> keyRanges; + + pstmt = conn.prepareStatement("SELECT COUNT(*) FROM STABLE").unwrap(PhoenixPreparedStatement.class); + pstmt.execute(); + keyRanges = getAllSplits(conn); + assertEquals("Unexpected number of splits: " + keyRanges, 7, keyRanges.size()); + assertEquals(newKeyRange(KeyRange.UNBOUND, KMIN), keyRanges.get(0)); + assertEquals(newKeyRange(KMIN, K3), keyRanges.get(1)); + assertEquals(newKeyRange(K3, K4), keyRanges.get(2)); + assertEquals(newKeyRange(K4, K9), keyRanges.get(3)); + assertEquals(newKeyRange(K9, K11), keyRanges.get(4)); + assertEquals(newKeyRange(K11, KMAX), keyRanges.get(5)); + assertEquals(newKeyRange(KMAX, KeyRange.UNBOUND), keyRanges.get(6)); + + keyRanges = getSplits(conn, K3, K6); + assertEquals("Unexpected number of splits: " + keyRanges, 2, keyRanges.size()); + assertEquals(newKeyRange(K3, K4), keyRanges.get(0)); + assertEquals(newKeyRange(K4, K6), keyRanges.get(1)); + + keyRanges = getSplits(conn, K5, K6); + assertEquals("Unexpected number of splits: " + keyRanges, 1, keyRanges.size()); + assertEquals(newKeyRange(K5, K6), keyRanges.get(0)); + + keyRanges = getSplits(conn, null, K1); + assertEquals("Unexpected number of splits: " + keyRanges, 2, keyRanges.size()); + assertEquals(newKeyRange(KeyRange.UNBOUND, KMIN), keyRanges.get(0)); + assertEquals(newKeyRange(KMIN, K1), keyRanges.get(1)); + conn.close(); + } + + @Test + public void testGuidePostsLifeCycle() throws Exception { + Connection conn = DriverManager.getConnection(getUrl(), TEST_PROPERTIES); + byte[][] splits = new byte[][] { K3, K9, KR }; + ensureTableCreated(getUrl(), STABLE_NAME, splits); + + PreparedStatement stmt = conn.prepareStatement("ANALYZE STABLE"); + stmt.execute(); + List<KeyRange> keyRanges = getAllSplits(conn); + assertEquals(4, keyRanges.size()); + upsert(conn, new byte[][] { KMIN, K4, K11 }); + stmt = conn.prepareStatement("ANALYZE STABLE"); + stmt.execute(); + conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); + keyRanges = getAllSplits(conn); + assertEquals(7, keyRanges.size()); + upsert(conn, new byte[][] { KMIN2, K5, K12 }); + stmt = conn.prepareStatement("ANALYZE STABLE"); + stmt.execute(); + conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); + keyRanges = getAllSplits(conn); + assertEquals(10, keyRanges.size()); + upsert(conn, new byte[][] { K1, K6, KP }); + stmt = conn.prepareStatement("ANALYZE STABLE"); + stmt.execute(); + conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); + keyRanges = getAllSplits(conn); + assertEquals(13, keyRanges.size()); + conn.close(); + } + + private static void upsert(Connection conn, byte[][] val) throws Exception { + PreparedStatement stmt = conn.prepareStatement("upsert into " + STABLE_NAME + " VALUES (?, ?)"); + stmt.setString(1, new String(val[0])); + stmt.setInt(2, 1); + stmt.execute(); + stmt.setString(1, new String(val[1])); + stmt.setInt(2, 2); + stmt.execute(); + stmt.setString(1, new String(val[2])); + stmt.setInt(2, 3); + stmt.execute(); + conn.commit(); + } + + private static KeyRange newKeyRange(byte[] lowerRange, byte[] upperRange) { + return PDataType.CHAR.getKeyRange(lowerRange, true, upperRange, false); + } + + private static void initTableValues(Connection conn) throws Exception { + byte[][] splits = new byte[][] {K3,K4,K9,K11}; + ensureTableCreated(getUrl(),STABLE_NAME,splits); + PreparedStatement stmt = conn.prepareStatement("upsert into " + STABLE_NAME + " VALUES (?, ?)"); + stmt.setString(1, new String(KMIN)); + stmt.setInt(2, 1); + stmt.execute(); + stmt.setString(1, new String(KMAX)); + stmt.setInt(2, 2); + stmt.execute(); + conn.commit(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPlanIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPlanIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPlanIT.java deleted file mode 100644 index 320ba72..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPlanIT.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end; - -import static org.apache.phoenix.util.TestUtil.ATABLE_NAME; -import static org.apache.phoenix.util.TestUtil.PTSDB3_NAME; -import static org.apache.phoenix.util.TestUtil.PTSDB_NAME; -import static org.junit.Assert.assertEquals; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.Map; -import java.util.Properties; - -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.QueryUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.google.common.collect.Maps; - -@Category(HBaseManagedTimeTest.class) -public class QueryPlanIT extends BaseHBaseManagedTimeIT { - - @BeforeClass - @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) - public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(1); - // Override date format so we don't have a bunch of zeros - props.put(QueryServices.DATE_FORMAT_ATTRIB, "yyyy-MM-dd"); - // Must update config before starting server - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } - - @Test - public void testExplainPlan() throws Exception { - ensureTableCreated(getUrl(), ATABLE_NAME, getDefaultSplits(getOrganizationId())); - ensureTableCreated(getUrl(), PTSDB_NAME, getDefaultSplits(getOrganizationId())); - ensureTableCreated(getUrl(), PTSDB3_NAME, getDefaultSplits(getOrganizationId())); - String[] queryPlans = new String[] { - - "SELECT host FROM PTSDB3 WHERE host IN ('na1', 'na2','na3')", - "CLIENT PARALLEL 1-WAY SKIP SCAN ON 3 KEYS OVER PTSDB3 [~'na3'] - [~'na1']\n" + - " SERVER FILTER BY FIRST KEY ONLY", - - "SELECT host FROM PTSDB WHERE inst IS NULL AND host IS NOT NULL AND date >= to_date('2013-01-01')", - "CLIENT PARALLEL 1-WAY RANGE SCAN OVER PTSDB [null,not null]\n" + - " SERVER FILTER BY FIRST KEY ONLY AND DATE >= '2013-01-01 00:00:00.000'", - - // Since inst IS NOT NULL is unbounded, we won't continue optimizing - "SELECT host FROM PTSDB WHERE inst IS NOT NULL AND host IS NULL AND date >= to_date('2013-01-01')", - "CLIENT PARALLEL 4-WAY RANGE SCAN OVER PTSDB [not null]\n" + - " SERVER FILTER BY FIRST KEY ONLY AND (HOST IS NULL AND DATE >= '2013-01-01 00:00:00.000')", - - "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id = '000000000000002' AND x_integer = 2 AND a_integer < 5 ", - "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER ATABLE\n" + - " SERVER FILTER BY (X_INTEGER = 2 AND A_INTEGER < 5)", - - "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000001','000000000000005') ", - "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000005'] - ['000000000000001','000000000000008']", - - "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) <= ('000000000000001','000000000000005') ", - "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000003'] - ['000000000000001','000000000000006']", - - "SELECT a_string,b_string FROM atable WHERE organization_id > '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000003','000000000000005') ", - "CLIENT PARALLEL 4-WAY RANGE SCAN OVER ATABLE ['000000000000003','000000000000005'] - [*]\n" + - " SERVER FILTER BY (ENTITY_ID > '000000000000002' AND ENTITY_ID < '000000000000008')", - - "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id >= '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000000','000000000000005') ", - "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000002'] - ['000000000000001','000000000000008']", - - "SELECT * FROM atable", - "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE", - - "SELECT inst,host FROM PTSDB WHERE REGEXP_SUBSTR(INST, '[^-]+', 1) IN ('na1', 'na2','na3')", // REVIEW: should this use skip scan given the regexpr_substr - "CLIENT PARALLEL 1-WAY SKIP SCAN ON 3 RANGES OVER PTSDB ['na1'] - ['na4']\n" + - " SERVER FILTER BY FIRST KEY ONLY AND REGEXP_SUBSTR(INST, '[^-]+', 1) IN ('na1','na2','na3')", - - "SELECT inst,host FROM PTSDB WHERE inst IN ('na1', 'na2','na3') AND host IN ('a','b') AND date >= to_date('2013-01-01') AND date < to_date('2013-01-02')", - "CLIENT PARALLEL 1-WAY SKIP SCAN ON 6 RANGES OVER PTSDB ['na1','a','2013-01-01'] - ['na3','b','2013-01-02']\n" + - " SERVER FILTER BY FIRST KEY ONLY", - - "SELECT inst,host FROM PTSDB WHERE inst LIKE 'na%' AND host IN ('a','b') AND date >= to_date('2013-01-01') AND date < to_date('2013-01-02')", - "CLIENT PARALLEL 1-WAY SKIP SCAN ON 2 RANGES OVER PTSDB ['na','a','2013-01-01'] - ['nb','b','2013-01-02']\n" + - " SERVER FILTER BY FIRST KEY ONLY", - - "SELECT count(*) FROM atable", - "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + - " SERVER FILTER BY FIRST KEY ONLY\n" + - " SERVER AGGREGATE INTO SINGLE ROW", - - "SELECT count(*) FROM atable WHERE organization_id='000000000000001' AND SUBSTR(entity_id,1,3) > '002' AND SUBSTR(entity_id,1,3) <= '003'", - "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','003 '] - ['000000000000001','004 ']\n" + - " SERVER FILTER BY FIRST KEY ONLY\n" + - " SERVER AGGREGATE INTO SINGLE ROW", - - "SELECT a_string FROM atable WHERE organization_id='000000000000001' AND SUBSTR(entity_id,1,3) > '002' AND SUBSTR(entity_id,1,3) <= '003'", - "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','003 '] - ['000000000000001','004 ']", - - "SELECT count(1) FROM atable GROUP BY a_string", - "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + - " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING]\n" + - "CLIENT MERGE SORT", - - "SELECT count(1) FROM atable GROUP BY a_string LIMIT 5", - "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + - " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING]\n" + - "CLIENT MERGE SORT\n" + - "CLIENT 5 ROW LIMIT", - - "SELECT a_string FROM atable ORDER BY a_string DESC LIMIT 3", - "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + - " SERVER TOP 3 ROWS SORTED BY [A_STRING DESC]\n" + - "CLIENT MERGE SORT", - - "SELECT count(1) FROM atable GROUP BY a_string,b_string HAVING max(a_string) = 'a'", - "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + - " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" + - "CLIENT MERGE SORT\n" + - "CLIENT FILTER BY MAX(A_STRING) = 'a'", - - "SELECT count(1) FROM atable WHERE a_integer = 1 GROUP BY ROUND(a_time,'HOUR',2),entity_id HAVING max(a_string) = 'a'", - "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + - " SERVER FILTER BY A_INTEGER = 1\n" + - " SERVER AGGREGATE INTO DISTINCT ROWS BY [ENTITY_ID, ROUND(A_TIME)]\n" + - "CLIENT MERGE SORT\n" + - "CLIENT FILTER BY MAX(A_STRING) = 'a'", - - "SELECT count(1) FROM atable WHERE a_integer = 1 GROUP BY a_string,b_string HAVING max(a_string) = 'a' ORDER BY b_string", - "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + - " SERVER FILTER BY A_INTEGER = 1\n" + - " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" + - "CLIENT MERGE SORT\n" + - "CLIENT FILTER BY MAX(A_STRING) = 'a'\n" + - "CLIENT SORTED BY [B_STRING]", - - "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id != '000000000000002' AND x_integer = 2 AND a_integer < 5 LIMIT 10", - "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + - " SERVER FILTER BY (ENTITY_ID != '000000000000002' AND X_INTEGER = 2 AND A_INTEGER < 5)\n" + - " SERVER 10 ROW LIMIT\n" + - "CLIENT 10 ROW LIMIT", - - "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' ORDER BY a_string ASC NULLS FIRST LIMIT 10", - "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + - " SERVER TOP 10 ROWS SORTED BY [A_STRING]\n" + - "CLIENT MERGE SORT", - - "SELECT max(a_integer) FROM atable WHERE organization_id = '000000000000001' GROUP BY organization_id,entity_id,ROUND(a_date,'HOUR') ORDER BY entity_id NULLS LAST LIMIT 10", - "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + - " SERVER AGGREGATE INTO DISTINCT ROWS BY [ORGANIZATION_ID, ENTITY_ID, ROUND(A_DATE)]\n" + - "CLIENT MERGE SORT\n" + - "CLIENT TOP 10 ROWS SORTED BY [ENTITY_ID NULLS LAST]", - - "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' ORDER BY a_string DESC NULLS LAST LIMIT 10", - "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + - " SERVER TOP 10 ROWS SORTED BY [A_STRING DESC NULLS LAST]\n" + - "CLIENT MERGE SORT", - - "SELECT a_string,b_string FROM atable WHERE organization_id IN ('000000000000001', '000000000000005')", - "CLIENT PARALLEL 1-WAY SKIP SCAN ON 2 KEYS OVER ATABLE ['000000000000001'] - ['000000000000005']", - - "SELECT a_string,b_string FROM atable WHERE organization_id IN ('00D000000000001', '00D000000000005') AND entity_id IN('00E00000000000X','00E00000000000Z')", - "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 4 KEYS OVER ATABLE", - }; - for (int i = 0; i < queryPlans.length; i+=2) { - String query = queryPlans[i]; - String plan = queryPlans[i+1]; - Properties props = new Properties(); - Connection conn = DriverManager.getConnection(getUrl(), props); - try { - Statement statement = conn.createStatement(); - ResultSet rs = statement.executeQuery("EXPLAIN " + query); - // TODO: figure out a way of verifying that query isn't run during explain execution - assertEquals(query, plan, QueryUtil.getExplainPlan(rs)); - } catch (Exception e) { - throw new Exception(query + ": "+ e.getMessage(), e); - } finally { - conn.close(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java index 28bc011..3d057ae 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java @@ -22,12 +22,12 @@ import static org.junit.Assert.assertEquals; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ParameterMetaData; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -36,14 +36,23 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.filter.SkipScanFilter; -import org.apache.phoenix.iterate.SkipRangeParallelIteratorRegionSplitter; +import org.apache.phoenix.iterate.ParallelIterators; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.SpoolingResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.parse.HintNode; +import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PDataType; @@ -58,6 +67,7 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.ScanUtil; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -71,6 +81,11 @@ import com.google.common.collect.Maps; /** * Tests for {@link SkipRangeParallelIteratorRegionSplitter}. + * TODO: Change this to be a connectionless test (ParallelIteratorsTest) with the ability to specify split points. + * -- On Connectionless, remember the split points of a table and use those when it says + * -- getRegionLocations + * -- Then drive this from a query plus getting the query plan and confirming the ranges + * -- from the plan. */ @RunWith(Parameterized.class) @Category(ClientManagedTimeTest.class) @@ -96,6 +111,7 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged } @Test + @Ignore public void testGetSplitsWithSkipScanFilter() throws Exception { byte[][] splits = new byte[][] {Ka1A, Ka1B, Ka1E, Ka1G, Ka1I, Ka2A}; long ts = nextTimestamp(); @@ -106,9 +122,6 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); initTableValues(); - PreparedStatement stmt = conn.prepareStatement("ANALYZE "+TABLE_NAME); - stmt.execute(); - conn.close(); TableRef tableRef = new TableRef(null,pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), TABLE_NAME)),ts, false); List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes()); List<KeyRange> ranges = getSplits(tableRef, scan, regions, scanRanges); @@ -322,7 +335,7 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } - private static List<KeyRange> getSplits(TableRef tableRef, final Scan scan, final List<HRegionLocation> regions, + private static List<KeyRange> getSplits(final TableRef tableRef, final Scan scan, final List<HRegionLocation> regions, final ScanRanges scanRanges) throws SQLException { final List<TableRef> tableRefs = Collections.singletonList(tableRef); ColumnResolver resolver = new ColumnResolver() { @@ -345,17 +358,83 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged }; PhoenixConnection connection = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); - PhoenixStatement statement = new PhoenixStatement(connection); - StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); + final PhoenixStatement statement = new PhoenixStatement(connection); + final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); context.setScanRanges(scanRanges); - SkipRangeParallelIteratorRegionSplitter splitter = SkipRangeParallelIteratorRegionSplitter.getInstance(context, tableRef.getTable(), HintNode.EMPTY_HINT_NODE); - List<KeyRange> keyRanges = splitter.getSplits(); - Collections.sort(keyRanges, new Comparator<KeyRange>() { + ParallelIterators parallelIterators = new ParallelIterators(new QueryPlan() { + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return ExplainPlan.EMPTY_PLAN; + } + + @Override + public ResultIterator iterator() throws SQLException { + return ResultIterator.EMPTY_ITERATOR; + } + @Override - public int compare(KeyRange o1, KeyRange o2) { - return Bytes.compareTo(o1.getLowerRange(),o2.getLowerRange()); + public long getEstimatedSize() { + return 0; } - }); + + @Override + public TableRef getTableRef() { + return tableRef; + } + + @Override + public RowProjector getProjector() { + return RowProjector.EMPTY_PROJECTOR; + } + + @Override + public Integer getLimit() { + return null; + } + + @Override + public OrderBy getOrderBy() { + return OrderBy.EMPTY_ORDER_BY; + } + + @Override + public GroupBy getGroupBy() { + return GroupBy.EMPTY_GROUP_BY; + } + + @Override + public List<KeyRange> getSplits() { + return null; + } + + @Override + public FilterableStatement getStatement() { + return SelectStatement.SELECT_ONE; + } + + @Override + public boolean isDegenerate() { + return false; + } + + @Override + public boolean isRowKeyOrdered() { + return true; + } + + }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices())); + List<KeyRange> keyRanges = parallelIterators.getSplits(); return keyRanges; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java index 6f6c9a7..a07ad0e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java @@ -128,12 +128,13 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT { assertEquals(nRegions, nInitialRegions); int nRows = 2; - String query = "SELECT /*+ NO_INTRA_REGION_PARALLELIZATION */ count(*) FROM S WHERE a IN ('tl','jt')"; + String query = "SELECT /*+ NO_INTRA_REGION_PARALLELIZATION */ count(*) FROM S WHERE a IN ('tl','jt',' a',' b',' c',' d')"; ResultSet rs1 = conn.createStatement().executeQuery(query); assertTrue(rs1.next()); nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size(); // Region cache has been updated, as there are more regions now assertNotEquals(nRegions, nInitialRegions); + /* if (nRows != rs1.getInt(1)) { // Run the same query again and it always passes now // (as region cache is up-to-date) @@ -141,6 +142,7 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT { assertTrue(r2.next()); assertEquals(nRows, r2.getInt(1)); } + */ assertEquals(nRows, rs1.getInt(1)); } finally { admin.close(); @@ -346,4 +348,30 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT { assertFalse(rs.next()); } + @Test + public void testMinMaxRangeIntersection() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + + PreparedStatement stmt = conn.prepareStatement("create table splits_test " + + "(pk1 UNSIGNED_TINYINT NOT NULL, pk2 UNSIGNED_TINYINT NOT NULL, kv VARCHAR " + + "CONSTRAINT pk PRIMARY KEY (pk1, pk2)) SALT_BUCKETS=4 SPLIT ON (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); + // Split each salt bucket into multiple regions + stmt.setBytes(1, new byte[] {0, 1, 1}); + stmt.setBytes(2, new byte[] {0, 2, 1}); + stmt.setBytes(3, new byte[] {0, 3, 1}); + stmt.setBytes(4, new byte[] {1, 1, 1}); + stmt.setBytes(5, new byte[] {1, 2, 1}); + stmt.setBytes(6, new byte[] {1, 3, 1}); + stmt.setBytes(7, new byte[] {2, 1, 1}); + stmt.setBytes(8, new byte[] {2, 2, 1}); + stmt.setBytes(9, new byte[] {2, 3, 1}); + stmt.setBytes(10, new byte[] {3, 1, 1}); + stmt.setBytes(11, new byte[] {3, 2, 1}); + stmt.setBytes(12, new byte[] {3, 3, 1}); + stmt.execute(); + + // Use a query with a RVC in a non equality expression + ResultSet rs = conn.createStatement().executeQuery("select count(kv) from splits_test where pk1 <= 3 and (pk1,PK2) >= (3, 1)"); + assertTrue(rs.next()); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsManagerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsManagerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsManagerIT.java deleted file mode 100644 index b13379b..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsManagerIT.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end; - -import static org.apache.phoenix.util.TestUtil.STABLE_NAME; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.util.Properties; - -import org.apache.phoenix.query.ConnectionQueryServices; -import org.apache.phoenix.query.StatsManager; -import org.apache.phoenix.query.StatsManagerImpl; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.TimeKeeper; -import org.junit.Test; -import org.junit.experimental.categories.Category; - - -/** - * - * Test for stats manager, which is a client-side process that caches the - * first and last key of a given table. The {@link #testStatsManager()} - * test must be the only test here, as it relies on state that is only - * cleared between test runs. - * - */ - -@Category(ClientManagedTimeTest.class) -public class StatsManagerIT extends BaseParallelIteratorsRegionSplitterIT { - - private static class ManualTimeKeeper implements TimeKeeper { - private long currentTime = 0; - @Override - public long getCurrentTime() { - return currentTime; - } - - public void setCurrentTime(long currentTime) { - this.currentTime = currentTime; - } - } - - private static interface ChangeDetector { - boolean isChanged(); - } - - private boolean waitForAsyncChange(ChangeDetector detector, long maxWaitTimeMs) throws Exception { - long startTime = System.currentTimeMillis(); - do { - if (detector.isChanged()) { - return true; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - throw e; - } - } while (System.currentTimeMillis() - startTime < maxWaitTimeMs); - return false; - } - - private static class MinKeyChange implements ChangeDetector { - private byte[] value; - private StatsManager stats; - private TableRef table; - - public MinKeyChange(StatsManager stats, TableRef table) { - this.value = stats.getMinKey(table); - this.stats = stats; - this.table = table; - } - @Override - public boolean isChanged() { - return value != stats.getMinKey(table); - } - } - - private static class MaxKeyChange implements ChangeDetector { - private byte[] value; - private StatsManager stats; - private TableRef table; - - public MaxKeyChange(StatsManager stats, TableRef table) { - this.value = stats.getMaxKey(table); - this.stats = stats; - this.table = table; - } - @Override - public boolean isChanged() { - return value != stats.getMaxKey(table); - } - } - - @Test - public void testStatsManager() throws Exception { - long ts = nextTimestamp(); - initTableValues(ts); - String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(url, props); - TableRef table = getTableRef(conn, ts); - - int updateFreq = 5; - int maxAge = 10; - int startTime = 100; - long waitTime = 5000; - - ManualTimeKeeper timeKeeper = new ManualTimeKeeper(); - timeKeeper.setCurrentTime(startTime); - ConnectionQueryServices services = driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)); - StatsManager stats = new StatsManagerImpl(services, updateFreq, maxAge, timeKeeper); - MinKeyChange minKeyChange = new MinKeyChange(stats, table); - MaxKeyChange maxKeyChange = new MaxKeyChange(stats, table); - - byte[] minKey = minKeyChange.value; - assertTrue(minKey == null); - assertTrue(waitForAsyncChange(minKeyChange,waitTime)); - assertArrayEquals(KMIN, stats.getMinKey(table)); - assertArrayEquals(KMAX, stats.getMaxKey(table)); - minKeyChange = new MinKeyChange(stats, table); - - url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts+2; - props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - conn = DriverManager.getConnection(url, props); - PreparedStatement delStmt = conn.prepareStatement("delete from " + STABLE_NAME + " where id=?"); - delStmt.setString(1, new String(KMIN)); - delStmt.execute(); - PreparedStatement upsertStmt = conn.prepareStatement("upsert into " + STABLE_NAME + " VALUES (?, ?)"); - upsertStmt.setString(1, new String(KMIN2)); - upsertStmt.setInt(2, 1); - upsertStmt.execute(); - conn.commit(); - - assertFalse(waitForAsyncChange(minKeyChange,waitTime)); // Stats won't change until they're attempted to be retrieved again - timeKeeper.setCurrentTime(timeKeeper.getCurrentTime() + updateFreq); - minKeyChange = new MinKeyChange(stats, table); // Will kick off change, but will upate asynchronously - assertArrayEquals(KMIN, minKeyChange.value); - assertTrue(waitForAsyncChange(minKeyChange,waitTime)); - assertArrayEquals(KMIN2, stats.getMinKey(table)); - assertArrayEquals(KMAX, stats.getMaxKey(table)); - minKeyChange = new MinKeyChange(stats, table); - - timeKeeper.setCurrentTime(timeKeeper.getCurrentTime() + maxAge); - minKeyChange = new MinKeyChange(stats, table); // Will kick off change, but will upate asynchronously - assertTrue(null == minKeyChange.value); - assertTrue(waitForAsyncChange(minKeyChange,waitTime)); - assertArrayEquals(KMIN2, stats.getMinKey(table)); - assertArrayEquals(KMAX, stats.getMaxKey(table)); - maxKeyChange = new MaxKeyChange(stats, table); - - delStmt.setString(1, new String(KMAX)); - delStmt.execute(); - upsertStmt.setString(1, new String(KMAX2)); - upsertStmt.setInt(2, 1); - upsertStmt.execute(); - conn.commit(); - conn.close(); - - assertFalse(waitForAsyncChange(maxKeyChange,waitTime)); // Stats won't change until they're attempted to be retrieved again - timeKeeper.setCurrentTime(timeKeeper.getCurrentTime() + updateFreq); - maxKeyChange = new MaxKeyChange(stats, table); // Will kick off change, but will upate asynchronously - assertArrayEquals(KMAX, maxKeyChange.value); - assertTrue(waitForAsyncChange(maxKeyChange,waitTime)); - assertArrayEquals(KMAX2, stats.getMaxKey(table)); - assertArrayEquals(KMIN2, stats.getMinKey(table)); - maxKeyChange = new MaxKeyChange(stats, table); - - timeKeeper.setCurrentTime(timeKeeper.getCurrentTime() + maxAge); - maxKeyChange = new MaxKeyChange(stats, table); // Will kick off change, but will upate asynchronously - assertTrue(null == maxKeyChange.value); - assertTrue(waitForAsyncChange(maxKeyChange,waitTime)); - assertArrayEquals(KMIN2, stats.getMinKey(table)); - assertArrayEquals(KMAX2, stats.getMaxKey(table)); - } -}