DRILL-3503: Make PruneScanRule pluggable. Extend PartitionDescriptor to provide functionality needed by PruneScanRule. Removed redundant logic in PruneScanRule.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/95d576da Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/95d576da Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/95d576da Branch: refs/heads/master Commit: 95d576da572d1dcb96c42f662504d7340cc718cd Parents: a65c0a6 Author: Mehant Baid <meha...@gmail.com> Authored: Mon Jul 20 15:44:33 2015 -0700 Committer: Mehant Baid <meha...@gmail.com> Committed: Thu Jul 30 20:21:14 2015 -0700 ---------------------------------------------------------------------- .../planner/sql/HivePartitionDescriptor.java | 35 +++ .../exec/planner/DFSPartitionLocation.java | 65 ++++ .../planner/FileSystemPartitionDescriptor.java | 76 ++++- .../planner/ParquetPartitionDescriptor.java | 72 ++++- .../drill/exec/planner/PartitionDescriptor.java | 37 +++ .../drill/exec/planner/PartitionLocation.java | 35 +++ .../DrillPushPartitionFilterIntoScan.java | 2 +- .../exec/planner/logical/DrillRuleSets.java | 5 +- .../logical/partition/ParquetPruneScanRule.java | 91 ++++++ .../logical/partition/PruneScanRule.java | 294 ++----------------- 10 files changed, 438 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/95d576da/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java index 8307dff..93bfadd 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java @@ -17,9 +17,16 @@ */ package org.apache.drill.exec.planner.sql; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.PartitionDescriptor; +import org.apache.drill.exec.planner.PartitionLocation; +import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.store.hive.HiveTable; +import org.apache.drill.exec.vector.ValueVector; +import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,4 +66,32 @@ public class HivePartitionDescriptor implements PartitionDescriptor { return partitionMap.get(name); } + /* + * Following method stubs are just added to satisfy the interface implementation. + * Actual implementation will be added when hive partition pruning is plugged in + * as part of DRILL-3121 + */ + private String getBaseTableLocation() { + return null; + } + + @Override + public GroupScan createNewGroupScan(List<String> newFiles) throws Exception { + return null; + } + + @Override + public List<PartitionLocation> getPartitions() { + return null; + } + + @Override + public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { + + } + + @Override + public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings) { + return null; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/95d576da/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java new file mode 100644 index 0000000..f409321 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSPartitionLocation.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner; + +/** + * Class defines a single partition in a DFS table. + */ +public class DFSPartitionLocation implements PartitionLocation { + private final String[] dirs; + private final String file; + + public DFSPartitionLocation(int max, String selectionRoot, String file) { + this.file = file; + this.dirs = new String[max]; + int start = file.indexOf(selectionRoot) + selectionRoot.length(); + String postPath = file.substring(start); + if (postPath.length() == 0) { + return; + } + if(postPath.charAt(0) == '/'){ + postPath = postPath.substring(1); + } + String[] mostDirs = postPath.split("/"); + int maxLoop = Math.min(max, mostDirs.length - 1); + for(int i =0; i < maxLoop; i++){ + this.dirs[i] = mostDirs[i]; + } + } + + /** + * Returns the value for a give partition key + * @param index - Index of the partition key whose value is to be returned + * @return + */ + @Override + public String getPartitionValue(int index) { + assert index < dirs.length; + return dirs[index]; + } + + /** + * Return the full location of this partition + * @return + */ + @Override + public String getEntirePartitionLocation() { + return file; + } +} + http://git-wip-us.apache.org/repos/asf/drill/blob/95d576da/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java index 9ad14b1..9816f14 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java @@ -17,9 +17,26 @@ */ package org.apache.drill.exec.planner; +import java.io.IOException; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import com.google.common.base.Charsets; import com.google.common.collect.Maps; +import org.apache.calcite.util.BitSets; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.base.FileGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.ValueVector; // partition descriptor for file system based tables @@ -30,10 +47,12 @@ public class FileSystemPartitionDescriptor implements PartitionDescriptor { private final String partitionLabel; private final int partitionLabelLength; private final Map<String, Integer> partitions = Maps.newHashMap(); + private final DrillScanRel scanRel; - public FileSystemPartitionDescriptor(String partitionLabel) { - this.partitionLabel = partitionLabel; + public FileSystemPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { + this.partitionLabel = settings.getFsPartitionColumnLabel(); this.partitionLabelLength = partitionLabel.length(); + this.scanRel = scanRel; for(int i =0; i < 10; i++){ partitions.put(partitionLabel + i, i); } @@ -60,9 +79,58 @@ public class FileSystemPartitionDescriptor implements PartitionDescriptor { return MAX_NESTED_SUBDIRS; } - public String getName(int index){ - return partitionLabel + index; + @Override + public GroupScan createNewGroupScan(List<String> newFiles) throws IOException { + final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true); + final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection); + return newScan; + } + + @Override + public List<PartitionLocation> getPartitions() { + List<String> fileLocations = ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles(); + List<PartitionLocation> partitions = new LinkedList<>(); + for (String file: fileLocations) { + partitions.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file)); + } + return partitions; + } + + @Override + public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, + BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { + int record = 0; + for (PartitionLocation partitionLocation: partitions) { + for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) { + if (partitionLocation.getPartitionValue(partitionColumnIndex) == null) { + ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setNull(record); + } else { + byte[] bytes = (partitionLocation.getPartitionValue(partitionColumnIndex)).getBytes(Charsets.UTF_8); + ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setSafe(record, bytes, 0, bytes.length); + } + } + record++; + } + + for (ValueVector v : vectors) { + if (v == null) { + continue; + } + v.getMutator().setValueCount(partitions.size()); + } } + @Override + public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings) { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + public String getName(int index) { + return partitionLabel + index; + } + + private String getBaseTableLocation() { + final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection(); + return origSelection.getSelection().selectionRoot; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/95d576da/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java index 127e70a..f8af300 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java @@ -17,11 +17,24 @@ */ package org.apache.drill.exec.planner; -import com.google.common.collect.Maps; +import org.apache.calcite.util.BitSets; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.physical.base.FileGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.store.parquet.ParquetGroupScan; +import org.apache.drill.exec.vector.ValueVector; +import java.io.IOException; +import java.util.BitSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; /** @@ -30,9 +43,13 @@ import java.util.Map; public class ParquetPartitionDescriptor implements PartitionDescriptor { private final List<SchemaPath> partitionColumns; + private final DrillScanRel scanRel; + static final int MAX_NESTED_SUBDIRS = 10; - public ParquetPartitionDescriptor(List<SchemaPath> partitionColumns) { - this.partitionColumns = partitionColumns; + public ParquetPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { + ParquetGroupScan scan = (ParquetGroupScan) scanRel.getGroupScan(); + this.partitionColumns = scan.getPartitionColumns(); + this.scanRel = scanRel; } @Override @@ -59,4 +76,53 @@ public class ParquetPartitionDescriptor implements PartitionDescriptor { public int getMaxHierarchyLevel() { return partitionColumns.size(); } + + @Override + public GroupScan createNewGroupScan(List<String> newFiles) throws IOException { + final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true); + final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection); + return newScan; + } + + @Override + public List<PartitionLocation> getPartitions() { + Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet(); + List<PartitionLocation> partitions = new LinkedList<>(); + for (String file: fileLocations) { + partitions.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file)); + } + return partitions; + } + + @Override + public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, + BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { + int record = 0; + for (PartitionLocation partitionLocation: partitions) { + for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) { + SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex)); + ((ParquetGroupScan) scanRel.getGroupScan()).populatePruningVector(vectors[partitionColumnIndex], record, column, + partitionLocation.getEntirePartitionLocation()); + } + record++; + } + + for (ValueVector v : vectors) { + if (v == null) { + continue; + } + v.getMutator().setValueCount(partitions.size()); + } + + } + + @Override + public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings) { + return ((ParquetGroupScan) scanRel.getGroupScan()).getTypeForColumn(column); + } + + private String getBaseTableLocation() { + final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection(); + return origSelection.getSelection().selectionRoot; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/95d576da/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java index 35fdae9..6a4ee9e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java @@ -17,6 +17,19 @@ */ package org.apache.drill.exec.planner; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.vector.ValueVector; + +import java.io.IOException; +import java.util.BitSet; +import java.util.List; +import java.util.Map; + // Interface used to describe partitions. Currently used by file system based partitions and hive partitions public interface PartitionDescriptor { @@ -40,4 +53,28 @@ public interface PartitionDescriptor { // Maximum level of partition nesting/ hierarchy supported public int getMaxHierarchyLevel(); + + public GroupScan createNewGroupScan(List<String> newFiles) throws Exception; + + public List<PartitionLocation> getPartitions(); + + /** + * Method creates an in memory representation of all the partitions. For each level of partitioning we + * will create a value vector which this method will populate for all the partitions with the values of the + * partioning key + * @param vectors - Array of vectors in the container that need to be populated + * @param partitions - List of all the partitions that exist in the table + * @param partitionColumnBitSet - Partition columns selected in the query + * @param fieldNameMap - Maps field ordinal to the field name + */ + void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, + BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap); + + /** + * Method returns the Major type associated with the given column + * @param column - column whose type should be determined + * @param plannerSettings + * @return + */ + TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings); } http://git-wip-us.apache.org/repos/asf/drill/blob/95d576da/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java new file mode 100644 index 0000000..656e3a9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner; + +/* + * Interface to define a single partition. It contains the + * location of the entire partition and also stores the + * value of the individual partition keys for this partition. + */ +public interface PartitionLocation { + /* + * Returns the value of the 'index' partition column + */ + public String getPartitionValue(int index); + + /* + * Returns the string representation of this partition + */ + public String getEntirePartitionLocation(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/95d576da/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java index b83cedd..811eef1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java @@ -120,7 +120,7 @@ public abstract class DrillPushPartitionFilterIntoScan extends RelOptRule { DrillRel inputRel = projectRel != null ? projectRel : scanRel; PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); - DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, filterRel.getCluster().getRexBuilder(), new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel())); + DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, filterRel.getCluster().getRexBuilder(), new FileSystemPartitionDescriptor(settings, scanRel)); FormatSelection origSelection = (FormatSelection)scanRel.getDrillTable().getSelection(); FormatSelection newSelection = splitFilter(origSelection, builder); http://git-wip-us.apache.org/repos/asf/drill/blob/95d576da/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index a821a62..3f49498 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -33,6 +33,7 @@ import org.apache.calcite.tools.RuleSet; import org.apache.calcite.rel.rules.FilterMergeRule; import org.apache.drill.exec.ops.OptimizerRulesContext; +import org.apache.drill.exec.planner.logical.partition.ParquetPruneScanRule; import org.apache.drill.exec.planner.logical.partition.PruneScanRule; import org.apache.drill.exec.planner.physical.ConvertCountToDirectScan; import org.apache.drill.exec.planner.physical.FilterPrule; @@ -156,8 +157,8 @@ public class DrillRuleSets { PruneScanRule.getFilterOnProject(optimizerRulesContext), PruneScanRule.getFilterOnScan(optimizerRulesContext), - PruneScanRule.getFilterOnProjectParquet(optimizerRulesContext), - PruneScanRule.getFilterOnScanParquet(optimizerRulesContext), + ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext), + ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext), /* Convert from Calcite Logical to Drill Logical Rules. http://git-wip-us.apache.org/repos/asf/drill/blob/95d576da/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java new file mode 100644 index 0000000..a951854 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical.partition; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.drill.exec.ops.OptimizerRulesContext; +import org.apache.drill.exec.physical.base.FileGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.ParquetPartitionDescriptor; +import org.apache.drill.exec.planner.PartitionDescriptor; +import org.apache.drill.exec.planner.logical.DrillFilterRel; +import org.apache.drill.exec.planner.logical.DrillProjectRel; +import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.store.parquet.ParquetGroupScan; + +public class ParquetPruneScanRule { + + public static final RelOptRule getFilterOnProjectParquet(OptimizerRulesContext optimizerRulesContext) { + return new PruneScanRule( + RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), + "PruneScanRule:Filter_On_Project_Parquet", + optimizerRulesContext) { + + @Override + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { + return new ParquetPartitionDescriptor(settings, scanRel); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final DrillScanRel scan = (DrillScanRel) call.rel(2); + GroupScan groupScan = scan.getGroupScan(); + // this rule is applicable only for parquet based partition pruning + return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown(); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); + final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1); + final DrillScanRel scanRel = (DrillScanRel) call.rel(2); + doOnMatch(call, filterRel, projectRel, scanRel); + } + }; + } + + public static final RelOptRule getFilterOnScanParquet(OptimizerRulesContext optimizerRulesContext) { + return new PruneScanRule( + RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)), + "PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) { + + @Override + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { + return new ParquetPartitionDescriptor(settings, scanRel); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final DrillScanRel scan = (DrillScanRel) call.rel(1); + GroupScan groupScan = scan.getGroupScan(); + // this rule is applicable only for parquet based partition pruning + return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown(); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); + final DrillScanRel scanRel = (DrillScanRel) call.rel(1); + doOnMatch(call, filterRel, null, scanRel); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/95d576da/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java index 27ca3da..af67282 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java @@ -17,10 +17,8 @@ */ package org.apache.drill.exec.planner.logical.partition; -import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -41,8 +39,8 @@ import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.FileSystemPartitionDescriptor; -import org.apache.drill.exec.planner.ParquetPartitionDescriptor; import org.apache.drill.exec.planner.PartitionDescriptor; +import org.apache.drill.exec.planner.PartitionLocation; import org.apache.drill.exec.planner.logical.DrillFilterRel; import org.apache.drill.exec.planner.logical.DrillOptiq; import org.apache.drill.exec.planner.logical.DrillParseContext; @@ -54,11 +52,8 @@ import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.store.dfs.FileSelection; -import org.apache.drill.exec.store.dfs.FormatSelection; -import org.apache.drill.exec.store.parquet.ParquetGroupScan; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.vector.NullableBitVector; -import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.calcite.rel.RelNode; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; @@ -66,14 +61,20 @@ import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rex.RexNode; -import com.google.common.base.Charsets; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.drill.exec.vector.ValueVector; -public abstract class PruneScanRule extends RelOptRule { +public abstract class PruneScanRule extends StoragePluginOptimizerRule { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class); + final OptimizerRulesContext optimizerContext; + + public PruneScanRule(RelOptRuleOperand operand, String id, OptimizerRulesContext optimizerContext) { + super(operand, id); + this.optimizerContext = optimizerContext; + } + public static final RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) { return new PruneScanRule( RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), @@ -81,6 +82,11 @@ public abstract class PruneScanRule extends RelOptRule { optimizerRulesContext) { @Override + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { + return new FileSystemPartitionDescriptor(settings, scanRel); + } + + @Override public boolean matches(RelOptRuleCall call) { final DrillScanRel scan = (DrillScanRel) call.rel(2); GroupScan groupScan = scan.getGroupScan(); @@ -95,44 +101,6 @@ public abstract class PruneScanRule extends RelOptRule { final DrillScanRel scanRel = (DrillScanRel) call.rel(2); doOnMatch(call, filterRel, projectRel, scanRel); } - - @Override - protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { - return new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel()); - } - - @Override - protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) { - int record = 0; - for (Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++) { - final PathPartition partition = iter.next(); - for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) { - if (partition.dirs[partitionColumnIndex] == null) { - ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setNull(record); - } else { - byte[] bytes = partition.dirs[partitionColumnIndex].getBytes(Charsets.UTF_8); - ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setSafe(record, bytes, 0, bytes.length); - } - } - } - - for (ValueVector v : vectors) { - if (v == null) { - continue; - } - v.getMutator().setValueCount(partitions.size()); - } - } - - @Override - protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) { - return Types.optional(MinorType.VARCHAR); - } - - @Override - protected List<String> getFiles(DrillScanRel scanRel) { - return ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles(); - } }; } @@ -142,128 +110,11 @@ public abstract class PruneScanRule extends RelOptRule { "PruneScanRule:Filter_On_Scan", optimizerRulesContext) { @Override - public boolean matches(RelOptRuleCall call) { - final DrillScanRel scan = (DrillScanRel) call.rel(1); - GroupScan groupScan = scan.getGroupScan(); - // this rule is applicable only for dfs based partition pruning - return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown(); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); - final DrillScanRel scanRel = (DrillScanRel) call.rel(1); - doOnMatch(call, filterRel, null, scanRel); - } - - @Override - protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { - return new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel()); - } - - @Override - protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) { - int record = 0; - for (Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++) { - final PathPartition partition = iter.next(); - for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) { - if (partition.dirs[partitionColumnIndex] == null) { - ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setNull(record); - } else { - byte[] bytes = partition.dirs[partitionColumnIndex].getBytes(Charsets.UTF_8); - ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setSafe(record, bytes, 0, bytes.length); - } - } - } - - for (ValueVector v : vectors) { - if (v == null) { - continue; - } - v.getMutator().setValueCount(partitions.size()); - } - } - - @Override - protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) { - return Types.optional(MinorType.VARCHAR); - } - - @Override - protected List<String> getFiles(DrillScanRel scanRel) { - return ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles(); - } - }; - } - - public static final RelOptRule getFilterOnProjectParquet(OptimizerRulesContext optimizerRulesContext) { - return new PruneScanRule( - RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), - "PruneScanRule:Filter_On_Project_Parquet", - optimizerRulesContext) { - - @Override - public boolean matches(RelOptRuleCall call) { - final DrillScanRel scan = (DrillScanRel) call.rel(2); - GroupScan groupScan = scan.getGroupScan(); - // this rule is applicable only for dfs based partition pruning - return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown(); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); - final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1); - final DrillScanRel scanRel = (DrillScanRel) call.rel(2); - doOnMatch(call, filterRel, projectRel, scanRel); + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { + return new FileSystemPartitionDescriptor(settings, scanRel); } @Override - protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { - return new ParquetPartitionDescriptor(scanRel.getGroupScan().getPartitionColumns()); - } - - @Override - protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) { - int record = 0; - for (Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++) { - final PathPartition partition = iter.next(); - for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) { - SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex)); - ((ParquetGroupScan) groupScan).populatePruningVector(vectors[partitionColumnIndex], record, column, partition.file); - } - } - - for (ValueVector v : vectors) { - if (v == null) { - continue; - } - v.getMutator().setValueCount(partitions.size()); - } - } - - @Override - protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) { - return ((ParquetGroupScan) groupScan).getTypeForColumn(column); - } - - @Override - protected List<String> getFiles(DrillScanRel scanRel) { - ParquetGroupScan groupScan = (ParquetGroupScan) scanRel.getGroupScan(); - return new ArrayList(groupScan.getFileSet()); - } - }; - } - - // Using separate rules for Parquet column based partition pruning. In the future, we may want to see if we can combine these into - // a single rule which handles both types of pruning - - public static final RelOptRule getFilterOnScanParquet(OptimizerRulesContext optimizerRulesContext) { - return new PruneScanRule( - RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)), - "PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) { - - @Override public boolean matches(RelOptRuleCall call) { final DrillScanRel scan = (DrillScanRel) call.rel(1); GroupScan groupScan = scan.getGroupScan(); @@ -277,53 +128,9 @@ public abstract class PruneScanRule extends RelOptRule { final DrillScanRel scanRel = (DrillScanRel) call.rel(1); doOnMatch(call, filterRel, null, scanRel); } - - @Override - protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { - return new ParquetPartitionDescriptor(scanRel.getGroupScan().getPartitionColumns()); - } - - @Override - protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) { - int record = 0; - for (Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++) { - final PathPartition partition = iter.next(); - for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) { - SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex)); - ((ParquetGroupScan) groupScan).populatePruningVector(vectors[partitionColumnIndex], record, column, partition.file); - } - } - - for (ValueVector v : vectors) { - if (v == null) { - continue; - } - v.getMutator().setValueCount(partitions.size()); - } - } - - @Override - protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) { - return ((ParquetGroupScan) groupScan).getTypeForColumn(column); - } - - @Override - protected List<String> getFiles(DrillScanRel scanRel) { - ParquetGroupScan groupScan = (ParquetGroupScan) scanRel.getGroupScan(); - return new ArrayList(groupScan.getFileSet()); - } }; } - final OptimizerRulesContext optimizerContext; - - private PruneScanRule(RelOptRuleOperand operand, String id, OptimizerRulesContext optimizerContext) { - super(operand, id); - this.optimizerContext = optimizerContext; - } - - protected abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel); - protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) { final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel); @@ -374,20 +181,12 @@ public abstract class PruneScanRule extends RelOptRule { // set up the partitions final GroupScan groupScan = scanRel.getGroupScan(); - final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection(); - final List<String> files = getFiles(scanRel); - final String selectionRoot = origSelection.getSelection().selectionRoot; - List<PathPartition> partitions = Lists.newLinkedList(); + List<PartitionLocation> partitions = descriptor.getPartitions(); - // let's only deal with one batch of files for now. - if (files.size() > Character.MAX_VALUE) { + if (partitions.size() > Character.MAX_VALUE) { return; } - for (String f : files) { - partitions.add(new PathPartition(descriptor.getMaxHierarchyLevel(), selectionRoot, f)); - } - final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator); final VectorContainer container = new VectorContainer(); @@ -395,7 +194,7 @@ public abstract class PruneScanRule extends RelOptRule { final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()]; for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) { SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex)); - MajorType type = getVectorType(groupScan, column); + MajorType type = descriptor.getVectorType(column, settings); MaterializedField field = MaterializedField.create(column, type); ValueVector v = TypeHelper.getNewVector(field, allocator); v.allocateNew(); @@ -404,8 +203,7 @@ public abstract class PruneScanRule extends RelOptRule { } // populate partition vectors. - - populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap, groupScan); + descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap); // materialize the expression logger.debug("Attempting to prune {}", pruneCondition); @@ -421,25 +219,25 @@ public abstract class PruneScanRule extends RelOptRule { int record = 0; List<String> newFiles = Lists.newArrayList(); - for (Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++) { - PathPartition part = iter.next(); - if (!output.getAccessor().isNull(record) && output.getAccessor().get(record) == 1) { - newFiles.add(part.file); + for(PartitionLocation part: partitions){ + if(!output.getAccessor().isNull(record) && output.getAccessor().get(record) == 1){ + newFiles.add(part.getEntirePartitionLocation()); } + record++; } boolean canDropFilter = true; if (newFiles.isEmpty()) { - newFiles.add(files.get(0)); + newFiles.add(partitions.get(0).getEntirePartitionLocation()); canDropFilter = false; } - if (newFiles.size() == files.size()) { + if (newFiles.size() == partitions.size()) { return; } - logger.debug("Pruned {} => {}", files, newFiles); + logger.debug("Pruned {} => {}", partitions.size(), newFiles.size()); List<RexNode> conjuncts = RelOptUtil.conjunctions(condition); @@ -452,13 +250,11 @@ public abstract class PruneScanRule extends RelOptRule { condition = condition.accept(reverseVisitor); pruneCondition = pruneCondition.accept(reverseVisitor); - final FileSelection newFileSelection = new FileSelection(newFiles, selectionRoot, true); - final FileGroupScan newScan = ((FileGroupScan) scanRel.getGroupScan()).clone(newFileSelection); final DrillScanRel newScanRel = new DrillScanRel(scanRel.getCluster(), scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), scanRel.getTable(), - newScan, + descriptor.createNewGroupScan(newFiles), scanRel.getRowType(), scanRel.getColumns()); @@ -485,35 +281,5 @@ public abstract class PruneScanRule extends RelOptRule { } } - protected abstract void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan); - - protected abstract MajorType getVectorType(GroupScan groupScan, SchemaPath column); - - protected abstract List<String> getFiles(DrillScanRel scanRel); - - private static class PathPartition { - final String[] dirs; - final String file; - - public PathPartition(int max, String selectionRoot, String file) { - this.file = file; - this.dirs = new String[max]; - int start = file.indexOf(selectionRoot) + selectionRoot.length(); - String postPath = file.substring(start); - if (postPath.length() == 0) { - return; - } - if (postPath.charAt(0) == '/') { - postPath = postPath.substring(1); - } - String[] mostDirs = postPath.split("/"); - int maxLoop = Math.min(max, mostDirs.length - 1); - for (int i = 0; i < maxLoop; i++) { - this.dirs[i] = mostDirs[i]; - } - } - - - } - + public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel); }