Repository: drill Updated Branches: refs/heads/master 7bfcb40a0 -> 0a2518d7c
DRILL-4363: Row count based pruning for parquet table used in Limit n query. Modify two existint unit testcase: 1) TestPartitionFilter.testMainQueryFalseCondition(): rowCount pruning applied after false condition is transformed into LIMIT 0 2) TestLimitWithExchanges.testPushLimitPastUnionExchange(): modify the testcase to use Json source, so that it does not mix with PushLimitIntoScanRule. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0a2518d7 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0a2518d7 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0a2518d7 Branch: refs/heads/master Commit: 0a2518d7cf01a92a27a82e29edac5424bedf31d5 Parents: 7bfcb40 Author: Jinfeng Ni <j...@apache.org> Authored: Tue Feb 2 15:31:47 2016 -0800 Committer: Jinfeng Ni <j...@apache.org> Committed: Thu Feb 11 15:01:15 2016 -0800 ---------------------------------------------------------------------- .../exec/physical/base/AbstractGroupScan.java | 21 ++++ .../drill/exec/physical/base/GroupScan.java | 12 +++ .../logical/DrillPushLimitToScanRule.java | 108 +++++++++++++++++++ .../exec/planner/logical/DrillRuleSets.java | 4 +- .../exec/store/parquet/ParquetGroupScan.java | 55 +++++++++- .../org/apache/drill/TestPartitionFilter.java | 3 +- .../impl/limit/TestLimitWithExchanges.java | 43 ++++++-- .../exec/store/TestAffinityCalculator.java | 4 +- 8 files changed, 236 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index 1277ec4..b6b1a1e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -128,4 +128,25 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca public List<SchemaPath> getPartitionColumns() { return Lists.newArrayList(); } + + /** + * Default is not to support limit pushdown. + * @return + */ + @Override + @JsonIgnore + public boolean supportsLimitPushdown() { + return false; + } + + /** + * By default, return null to indicate rowcount based prune is not supported. + * Each groupscan subclass should override, if it supports rowcount based prune. + */ + @Override + @JsonIgnore + public GroupScan applyLimit(long maxRecords) { + return null; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index 946c7e8..041f10a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -98,4 +98,16 @@ public interface GroupScan extends Scan, HasAffinity{ @JsonIgnore public List<SchemaPath> getPartitionColumns(); + /** + * Whether or not this GroupScan supports limit pushdown + */ + public boolean supportsLimitPushdown(); + + /** + * Apply rowcount based prune for "LIMIT n" query. + * @param maxRecords : the number of rows requested from group scan. + * @return a new instance of group scan if the prune is successful. + * null when either if row-based prune is not supported, or if prune is not successful. + */ + public GroupScan applyLimit(long maxRecords); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java new file mode 100644 index 0000000..9f762f0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.logical; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.util.Pair; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.logical.partition.PruneScanRule; +import org.apache.drill.exec.store.parquet.ParquetGroupScan; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public abstract class DrillPushLimitToScanRule extends RelOptRule { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillPushLimitToScanRule.class); + + private DrillPushLimitToScanRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + public static DrillPushLimitToScanRule LIMIT_ON_SCAN = new DrillPushLimitToScanRule( + RelOptHelper.some(DrillLimitRel.class, RelOptHelper.any(DrillScanRel.class)), "DrillPushLimitToScanRule_LimitOnScan") { + @Override + public boolean matches(RelOptRuleCall call) { + DrillScanRel scanRel = call.rel(1); + return scanRel.getGroupScan().supportsLimitPushdown(); // For now only applies to Parquet. + } + + @Override + public void onMatch(RelOptRuleCall call) { + DrillLimitRel limitRel = call.rel(0); + DrillScanRel scanRel = call.rel(1); + doOnMatch(call, limitRel, scanRel, null); + } + }; + + public static DrillPushLimitToScanRule LIMIT_ON_PROJECT = new DrillPushLimitToScanRule( + RelOptHelper.some(DrillLimitRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), "DrillPushLimitToScanRule_LimitOnProject") { + @Override + public boolean matches(RelOptRuleCall call) { + DrillScanRel scanRel = call.rel(2); + return scanRel.getGroupScan().supportsLimitPushdown(); // For now only applies to Parquet. + } + + @Override + public void onMatch(RelOptRuleCall call) { + DrillLimitRel limitRel = call.rel(0); + DrillProjectRel projectRel = call.rel(1); + DrillScanRel scanRel = call.rel(2); + doOnMatch(call, limitRel, scanRel, projectRel); + } + }; + + + protected void doOnMatch(RelOptRuleCall call, DrillLimitRel limitRel, DrillScanRel scanRel, DrillProjectRel projectRel){ + try { + final int rowCountRequested = (int) limitRel.getRows(); + + final GroupScan newGroupScan = scanRel.getGroupScan().applyLimit(rowCountRequested); + + if (newGroupScan == null ) { + return; + } + + DrillScanRel newScanRel = new DrillScanRel(scanRel.getCluster(), + scanRel.getTraitSet(), + scanRel.getTable(), + newGroupScan, + scanRel.getRowType(), + scanRel.getColumns(), + scanRel.partitionFilterPushdown()); + + final RelNode newLimit; + if (projectRel != null) { + final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), ImmutableList.of((RelNode)newScanRel)); + newLimit = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of((RelNode)newProject)); + } else { + newLimit = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of((RelNode)newScanRel)); + } + + call.transformTo(newLimit); + logger.debug("Converted to a new DrillScanRel" + newScanRel.getGroupScan()); + } catch (Exception e) { + logger.warn("Exception while using the pruned partitions.", e); + } + + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index d9609d2..230cee2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -207,7 +207,9 @@ public class DrillRuleSets { PruneScanRule.getDirFilterOnProject(optimizerRulesContext), PruneScanRule.getDirFilterOnScan(optimizerRulesContext), ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext), - ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext) + ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext), + DrillPushLimitToScanRule.LIMIT_ON_SCAN, + DrillPushLimitToScanRule.LIMIT_ON_PROJECT ) .build(); http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index dfe9084..30e0846 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -30,6 +30,7 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.calcite.util.Pair; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; @@ -485,12 +486,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan { private EndpointByteMap byteMap; private int rowGroupIndex; private String root; + private long rowCount; // rowCount = -1 indicates to include all rows. @JsonCreator public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start, - @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) { + @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex, long rowCount) { super(path, start, length); this.rowGroupIndex = rowGroupIndex; + this.rowCount = rowCount; } public RowGroupReadEntry getRowGroupReadEntry() { @@ -519,6 +522,11 @@ public class ParquetGroupScan extends AbstractFileGroupScan { public void setEndpointByteMap(EndpointByteMap byteMap) { this.byteMap = byteMap; } + + public long getRowCount() { + return rowCount; + } + } private void init() throws IOException { @@ -582,7 +590,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { int rgIndex = 0; for (RowGroupMetadata rg : file.getRowGroups()) { RowGroupInfo rowGroupInfo = - new RowGroupInfo(file.getPath(), rg.getStart(), rg.getLength(), rgIndex); + new RowGroupInfo(file.getPath(), rg.getStart(), rg.getLength(), rgIndex, rg.getRowCount()); EndpointByteMap endpointByteMap = new EndpointByteMapImpl(); for (String host : rg.getHostAffinity().keySet()) { if (hostEndpointMap.containsKey(host)) { @@ -791,6 +799,49 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } @Override + public boolean supportsLimitPushdown() { + return true; + } + + @Override + public GroupScan applyLimit(long maxRecords) { + Preconditions.checkArgument(rowGroupInfos.size() >= 0); + + maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup. + // further optimization : minimize # of files chosen, or the affinity of files chosen. + long count = 0; + int index = 0; + for (RowGroupInfo rowGroupInfo : rowGroupInfos) { + if (count < maxRecords) { + count += rowGroupInfo.getRowCount(); + index ++; + } else { + break; + } + } + + Set<String> fileNames = Sets.newHashSet(); // HashSet keeps a fileName unique. + for (RowGroupInfo rowGroupInfo : rowGroupInfos.subList(0, index)) { + fileNames.add(rowGroupInfo.getPath()); + } + + if (fileNames.size() == fileSet.size() ) { + // There is no reduction of rowGroups. Return the original groupScan. + logger.debug("applyLimit() does not apply!"); + return null; + } + + try { + FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames), getSelectionRoot()); + logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), fileNames.size()); + return this.clone(newSelection); + } catch (IOException e) { + logger.warn("Could not apply rowcount based prune due to Exception : {}", e); + return null; + } + } + + @Override @JsonIgnore public boolean canPushdownProjects(List<SchemaPath> columns) { return true; http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java index e5d6603..a2d101e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java @@ -257,7 +257,8 @@ public class TestPartitionFilter extends PlanTestBase { public void testMainQueryFalseCondition() throws Exception { String root = FileUtils.getResourceAsFile("/multilevel/parquet").toURI().toString(); String query = String.format("select * from (select dir0, o_custkey from dfs_test.`%s` where dir0='1994') t where 1 = 0", root); - testExcludeFilter(query, 4, "Filter", 0); + // the 1 = 0 becomes limit 0, which will require to read only one parquet file, in stead of 4 for year '1994'. + testExcludeFilter(query, 1, "Filter", 0); } @Test // see DRILL-2712 http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java index 789a536..18f181b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java @@ -27,6 +27,8 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; public class TestLimitWithExchanges extends BaseTestQuery { + final String WORKING_PATH = TestTools.getWorkingPath(); + final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources"; @Test public void testLimitWithExchanges() throws Exception{ @@ -36,23 +38,20 @@ public class TestLimitWithExchanges extends BaseTestQuery { @Test public void testPushLimitPastUnionExchange() throws Exception { // Push limit past through UnionExchange. - final String WORKING_PATH = TestTools.getWorkingPath(); - final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources"; - try { test("alter session set `planner.slice_target` = 1"); final String[] excludedPlan = {}; // case 1. single table query. - final String sql = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 1 offset 2", TEST_RES_PATH); + final String sql = String.format("select * from dfs_test.`%s/multilevel/json` limit 1 offset 2", TEST_RES_PATH); final String[] expectedPlan ={"(?s)Limit\\(offset=\\[2\\], fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[3\\]\\).*Scan"}; testLimitHelper(sql, expectedPlan, excludedPlan, 1); - final String sql2 = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 1 offset 0", TEST_RES_PATH); + final String sql2 = String.format("select * from dfs_test.`%s/multilevel/json` limit 1 offset 0", TEST_RES_PATH); final String[] expectedPlan2 = {"(?s)Limit\\(offset=\\[0\\], fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Scan"}; testLimitHelper(sql2, expectedPlan2, excludedPlan, 1); - final String sql3 = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 1", TEST_RES_PATH); + final String sql3 = String.format("select * from dfs_test.`%s/multilevel/json` limit 1", TEST_RES_PATH); final String[] expectedPlan3 = {"(?s)Limit\\(fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Scan"}; testLimitHelper(sql3, expectedPlan3, excludedPlan, 1); @@ -79,9 +78,6 @@ public class TestLimitWithExchanges extends BaseTestQuery { @Test public void testNegPushLimitPastUnionExchange() throws Exception { // Negative case: should not push limit past through UnionExchange. - final String WORKING_PATH = TestTools.getWorkingPath(); - final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources"; - try { test("alter session set `planner.slice_target` = 1"); final String[] expectedPlan ={}; @@ -100,6 +96,35 @@ public class TestLimitWithExchanges extends BaseTestQuery { } } + @Test + public void testLimitImpactExchange() throws Exception { + try { + test("alter session set `planner.slice_target` = 5" ); + + // nation has 3 files, total 25 rows. + // Given slice_target = 5, if # of rows to fetch is < 5 : do NOT insert Exchange, and the query should run in single fragment. + // if # of row to fetch is >= 5: do insert exchange, and query should run in multiple fragments. + final String sql = String.format("select * from dfs_test.`%s/tpchmulti/nation` limit 2", TEST_RES_PATH); // Test Limit_On_Scan rule. + final String sql2 = String.format("select n_nationkey + 1000 from dfs_test.`%s/tpchmulti/nation` limit 2", TEST_RES_PATH); // Test Limit_On_Project rule. + final String [] expectedPlan = {}; + final String [] excludedPlan = {"UnionExchange"}; + + testLimitHelper(sql, expectedPlan, excludedPlan, 2); + testLimitHelper(sql2, expectedPlan, excludedPlan, 2); + + final String sql3 = String.format("select * from dfs_test.`%s/tpchmulti/nation` limit 10", TEST_RES_PATH); // Test Limit_On_Scan rule. + final String sql4 = String.format("select n_nationkey + 1000 from dfs_test.`%s/tpchmulti/nation` limit 10", TEST_RES_PATH); // Test Limit_On_Project rule. + + final String [] expectedPlan2 = {"UnionExchange"}; + final String [] excludedPlan2 = {}; + + testLimitHelper(sql3, expectedPlan2, excludedPlan2, 10); + testLimitHelper(sql4, expectedPlan2, excludedPlan2, 10); + } finally { + test("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_OPTION.getDefault().getValue()); + } + } + private void testLimitHelper(final String sql, final String[] expectedPlan, final String[] excludedPattern, int expectedRecordCount) throws Exception { // Validate the plan PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPattern); http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java index 0999218..dadb850 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java @@ -72,7 +72,9 @@ public class TestAffinityCalculator extends ExecTest { rowGroups.clear(); for (int i = 0; i < numberOfRowGroups; i++) { - rowGroups.add(new ParquetGroupScan.RowGroupInfo(path, (long)i*rowGroupSize, (long)rowGroupSize, i)); + // buildRowGroups method seems not be used at all. Pass -1 as rowCount. + // Maybe remove this method completely ? + rowGroups.add(new ParquetGroupScan.RowGroupInfo(path, (long)i*rowGroupSize, (long)rowGroupSize, i, -1)); } }