http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java index 3706ff2..64680c0 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java @@ -35,7 +35,7 @@ public class TestViewSupportOnHiveTables extends TestBaseViewSupport { @BeforeClass public static void generateHive() throws Exception{ - hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher.getRootDir()); + hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher); hiveTest.addHiveTestPlugin(getDrillbitContext().getStorage()); }
http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java index 78e5b39..f206999 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.nio.file.Files; +import java.nio.file.Paths; import java.nio.file.attribute.PosixFilePermission; import java.sql.Date; import java.sql.Timestamp; @@ -31,6 +32,7 @@ import com.google.common.collect.Sets; import com.google.common.io.Resources; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.drill.test.BaseDirTestWatcher; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.store.StoragePluginRegistry; @@ -52,9 +54,11 @@ public class HiveTestDataGenerator { private final String dbDir; private final String whDir; + private final BaseDirTestWatcher dirTestWatcher; private final Map<String, String> config; - public static synchronized HiveTestDataGenerator getInstance(File baseDir) throws Exception { + public static synchronized HiveTestDataGenerator getInstance(BaseDirTestWatcher dirTestWatcher) throws Exception { + File baseDir = dirTestWatcher.getRootDir(); if (instance == null || !HiveTestDataGenerator.baseDir.equals(baseDir)) { HiveTestDataGenerator.baseDir = baseDir; @@ -64,19 +68,20 @@ public class HiveTestDataGenerator { final String dbDir = dbDirFile.getAbsolutePath(); final String whDir = whDirFile.getAbsolutePath(); - instance = new HiveTestDataGenerator(dbDir, whDir); + instance = new HiveTestDataGenerator(dbDir, whDir, dirTestWatcher); instance.generateTestData(); } return instance; } - private HiveTestDataGenerator(final String dbDir, final String whDir) { + private HiveTestDataGenerator(final String dbDir, final String whDir, final BaseDirTestWatcher dirTestWatcher) { this.dbDir = dbDir; this.whDir = whDir; + this.dirTestWatcher = dirTestWatcher; config = Maps.newHashMap(); - config.put("hive.metastore.uris", ""); + config.put(ConfVars.METASTOREURIS.toString(), ""); config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir)); config.put("hive.metastore.warehouse.dir", whDir); config.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); @@ -128,7 +133,7 @@ public class HiveTestDataGenerator { try { Files.setPosixFilePermissions(dir.toPath(), perms); } catch (IOException e) { - new RuntimeException(e); + throw new RuntimeException(e); } return dir; @@ -494,22 +499,6 @@ public class HiveTestDataGenerator { executeQuery(hiveDriver, "INSERT INTO TABLE kv_parquet PARTITION(part1) SELECT key, value, key FROM default.kv"); executeQuery(hiveDriver, "ALTER TABLE kv_parquet ADD COLUMNS (newcol string)"); - executeQuery(hiveDriver, - "CREATE TABLE countStar_Parquet (int_field INT) STORED AS parquet"); - - final int numOfRows = 200; - final StringBuffer sb = new StringBuffer(); - sb.append("VALUES "); - for(int i = 0; i < numOfRows; ++i) { - if(i != 0) { - sb.append(","); - } - sb.append("(").append(i).append(")"); - } - - executeQuery(hiveDriver, "INSERT INTO TABLE countStar_Parquet \n" + - sb.toString()); - // Create a StorageHandler based table (DRILL-3739) executeQuery(hiveDriver, "CREATE TABLE kv_sh(key INT, value STRING) STORED BY " + "'org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler'"); @@ -551,9 +540,60 @@ public class HiveTestDataGenerator { Resources.getResource("simple.json") + "' into table default.simple_json"; executeQuery(hiveDriver, loadData); + createTestDataForDrillNativeParquetReaderTests(hiveDriver); + ss.close(); } + private void createTestDataForDrillNativeParquetReaderTests(Driver hiveDriver) { + // Hive managed table that has data qualified for Drill native filter push down + executeQuery(hiveDriver, "create table kv_native(key int, sub_key int) stored as parquet"); + // each insert is created in separate file + executeQuery(hiveDriver, "insert into table kv_native values (1, 1), (1, 2)"); + executeQuery(hiveDriver, "insert into table kv_native values (1, 3), (1, 4)"); + executeQuery(hiveDriver, "insert into table kv_native values (2, 5), (2, 6)"); + executeQuery(hiveDriver, "insert into table kv_native values (null, 9), (null, 10)"); + + // Hive external table which has three partitions + + // copy external table with data from test resources + dirTestWatcher.copyResourceToRoot(Paths.get("external")); + + File external = new File (baseDir, "external"); + String tableLocation = new File(external, "kv_native_ext").toURI().getPath(); + + executeQuery(hiveDriver, String.format("create external table kv_native_ext(key int) " + + "partitioned by (part_key int) " + + "stored as parquet location '%s'", + tableLocation)); + + /* + DATA: + key, part_key + 1, 1 + 2, 1 + 3, 2 + 4, 2 + */ + + // add partitions + + // partition in the same location as table + String firstPartition = new File(tableLocation, "part_key=1").toURI().getPath(); + executeQuery(hiveDriver, String.format("alter table kv_native_ext add partition (part_key = '1') " + + "location '%s'", firstPartition)); + + // partition in different location with table + String secondPartition = new File(external, "part_key=2").toURI().getPath(); + executeQuery(hiveDriver, String.format("alter table kv_native_ext add partition (part_key = '2') " + + "location '%s'", secondPartition)); + + // add empty partition + String thirdPartition = new File(dirTestWatcher.makeSubDir(Paths.get("empty_part")), "part_key=3").toURI().getPath(); + executeQuery(hiveDriver, String.format("alter table kv_native_ext add partition (part_key = '3') " + + "location '%s'", thirdPartition)); + } + private File getTempFile() throws Exception { return java.nio.file.Files.createTempFile("drill-hive-test", ".txt").toFile(); } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet b/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet new file mode 100755 index 0000000..f641402 Binary files /dev/null and b/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet differ http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet b/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet new file mode 100755 index 0000000..c34c267 Binary files /dev/null and b/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet differ http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java index 211bd65..dfa41e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java @@ -18,8 +18,11 @@ package org.apache.drill.exec.ops; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; +import com.google.common.base.Preconditions; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.base.PhysicalOperator; @@ -28,8 +31,6 @@ import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ExecutionControls; import org.apache.hadoop.conf.Configuration; -import com.google.common.base.Preconditions; - import io.netty.buffer.DrillBuf; /** @@ -46,8 +47,9 @@ public abstract class BaseOperatorContext implements OperatorContext { protected final BufferAllocator allocator; protected final PhysicalOperator popConfig; protected final BufferManager manager; - private DrillFileSystem fs; + private List<DrillFileSystem> fileSystems; private ControlsInjector injector; + private boolean allowCreatingFileSystem = true; public BaseOperatorContext(FragmentContext context, BufferAllocator allocator, PhysicalOperator popConfig) { @@ -55,6 +57,7 @@ public abstract class BaseOperatorContext implements OperatorContext { this.allocator = allocator; this.popConfig = popConfig; this.manager = new BufferManagerImpl(allocator); + this.fileSystems = new ArrayList<>(); } @Override @@ -158,35 +161,46 @@ public abstract class BaseOperatorContext implements OperatorContext { } catch (RuntimeException e) { ex = ex == null ? e : ex; } - try { - if (fs != null) { + + for (DrillFileSystem fs : fileSystems) { + try { fs.close(); - fs = null; - } - } catch (IOException e) { + } catch (IOException e) { throw UserException.resourceError(e) - .addContext("Failed to close the Drill file system for " + getName()) - .build(logger); + .addContext("Failed to close the Drill file system for " + getName()) + .build(logger); + } } + if (ex != null) { throw ex; } } + /** + * Creates DrillFileSystem that automatically tracks operator stats. + * Only one tracking and no non-tracking file system per operator context. + */ @Override public DrillFileSystem newFileSystem(Configuration conf) throws IOException { - Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); - fs = new DrillFileSystem(conf, getStats()); + Preconditions.checkState(allowCreatingFileSystem, "Only one tracking file system is allowed per Operator Context and it is already created."); + Preconditions.checkState(fileSystems.isEmpty(), "Non-tracking file system(-s) is(are) already created."); + DrillFileSystem fs = new DrillFileSystem(conf, getStats()); + fileSystems.add(fs); + allowCreatingFileSystem = false; return fs; } /** * Creates a DrillFileSystem that does not automatically track operator stats. + * Multiple non-tracking file system are allowed. */ @Override public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException { - Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); - fs = new DrillFileSystem(conf, null); + Preconditions.checkState(allowCreatingFileSystem, "Only one tracking file system is allowed per Operator Context and it is already created."); + DrillFileSystem fs = new DrillFileSystem(conf, null); + fileSystems.add(fs); return fs; } + } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index b2ddf68..e89a1f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -24,12 +24,16 @@ import java.util.List; import com.google.common.collect.Lists; +import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.planner.fragment.DistributionAffinity; import org.apache.drill.exec.planner.physical.PlannerSettings; import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.drill.exec.server.options.OptionManager; public abstract class AbstractGroupScan extends AbstractBase implements GroupScan { @@ -164,4 +168,14 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca public DistributionAffinity getDistributionAffinity() { return DistributionAffinity.SOFT; } + + @Override + public LogicalExpression getFilter() { + return null; + } + + @Override + public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities, FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) { + return null; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index fc63c77..33ab13d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -21,13 +21,17 @@ import java.util.Collection; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.ImmutableList; +import org.apache.drill.exec.server.options.OptionManager; /** * A GroupScan operator represents all data which will be scanned by a given physical @@ -134,4 +138,10 @@ public interface GroupScan extends Scan, HasAffinity{ */ Collection<String> getFiles(); + @JsonIgnore + LogicalExpression getFilter(); + + GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities, + FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager); + } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java index 4404a98..f2974e1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.physical.base; public class ScanStats { -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanStats.class); public static final ScanStats TRIVIAL_TABLE = new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 20, 1, 1); @@ -30,7 +29,6 @@ public class ScanStats { private final GroupScanProperty property; public ScanStats(GroupScanProperty property, long recordCount, float cpuCost, float diskCost) { - super(); this.recordCount = recordCount; this.cpuCost = cpuCost; this.diskCost = diskCost; @@ -49,6 +47,11 @@ public class ScanStats { return diskCost; } + @Override + public String toString() { + return "ScanStats{" + "recordCount=" + recordCount + ", cpuCost=" + cpuCost + ", diskCost=" + diskCost + ", property=" + property + '}'; + } + /** * Return if GroupScan knows the exact row count in the result of getSize() call. * By default, groupscan does not know the exact row count, before it scans every rows. @@ -60,7 +63,7 @@ public class ScanStats { - public static enum GroupScanProperty { + public enum GroupScanProperty { NO_EXACT_ROW_COUNT(false, false), EXACT_ROW_COUNT(true, true); http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java index a183e51..cbc530b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java @@ -21,7 +21,6 @@ import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.util.BitSets; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillScanRel; @@ -29,31 +28,57 @@ import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.MetadataContext; -import org.apache.drill.exec.store.parquet.ParquetGroupScan; +import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan; +import org.apache.drill.exec.store.parquet.ParquetReaderUtility; +import org.apache.drill.exec.util.DecimalUtility; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableBitVector; +import org.apache.drill.exec.vector.NullableDateVector; +import org.apache.drill.exec.vector.NullableDecimal18Vector; +import org.apache.drill.exec.vector.NullableFloat4Vector; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableIntervalVector; +import org.apache.drill.exec.vector.NullableSmallIntVector; +import org.apache.drill.exec.vector.NullableTimeStampVector; +import org.apache.drill.exec.vector.NullableTimeVector; +import org.apache.drill.exec.vector.NullableTinyIntVector; +import org.apache.drill.exec.vector.NullableUInt1Vector; +import org.apache.drill.exec.vector.NullableUInt2Vector; +import org.apache.drill.exec.vector.NullableUInt4Vector; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.io.api.Binary; +import org.joda.time.DateTimeConstants; import java.io.IOException; +import java.util.ArrayList; import java.util.BitSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; - /** * PartitionDescriptor that describes partitions based on column names instead of directory structure */ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor { - private final List<SchemaPath> partitionColumns; + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetPartitionDescriptor.class); + private final DrillScanRel scanRel; + private final AbstractParquetGroupScan groupScan; + private final List<SchemaPath> partitionColumns; public ParquetPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { - ParquetGroupScan scan = (ParquetGroupScan) scanRel.getGroupScan(); - this.partitionColumns = scan.getPartitionColumns(); this.scanRel = scanRel; + assert scanRel.getGroupScan() instanceof AbstractParquetGroupScan; + this.groupScan = (AbstractParquetGroupScan) scanRel.getGroupScan(); + this.partitionColumns = groupScan.getPartitionColumns(); } @Override @@ -81,15 +106,6 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor { return partitionColumns.size(); } - private GroupScan createNewGroupScan(List<String> newFiles, String cacheFileRoot, - boolean wasAllPartitionsPruned, MetadataContext metaContext) throws IOException { - final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation(), - cacheFileRoot, wasAllPartitionsPruned); - newSelection.setMetaContext(metaContext); - final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection); - return newScan; - } - @Override public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { @@ -97,8 +113,7 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor { for (PartitionLocation partitionLocation: partitions) { for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) { SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex)); - ((ParquetGroupScan) scanRel.getGroupScan()).populatePruningVector(vectors[partitionColumnIndex], record, column, - partitionLocation.getEntirePartitionLocation()); + populatePruningVector(vectors[partitionColumnIndex], record, column, partitionLocation.getEntirePartitionLocation()); } record++; } @@ -114,7 +129,7 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor { @Override public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings) { - return ((ParquetGroupScan) scanRel.getGroupScan()).getTypeForColumn(column); + return groupScan.getTypeForColumn(column); } @Override @@ -124,26 +139,22 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor { } @Override - protected void createPartitionSublists() { - Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet(); - List<PartitionLocation> locations = new LinkedList<>(); - for (String file: fileLocations) { - locations.add(new ParquetPartitionLocation(file)); - } - locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE); - sublistsCreated = true; - } - - @Override - public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot, - boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception { - List<String> newFiles = Lists.newArrayList(); + public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, + String cacheFileRoot, + boolean wasAllPartitionsPruned, + MetadataContext metaContext) throws Exception { + List<String> newFiles = new ArrayList<>(); for (final PartitionLocation location : newPartitionLocation) { newFiles.add(location.getEntirePartitionLocation()); } final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot, wasAllPartitionsPruned, metaContext); + if (newGroupScan == null) { + logger.warn("Unable to create new group scan, returning original table scan."); + return scanRel; + } + return new DrillScanRel(scanRel.getCluster(), scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), scanRel.getTable(), @@ -154,9 +165,261 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor { } @Override - public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, - boolean wasAllPartitionsPruned) throws Exception { + public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, boolean wasAllPartitionsPruned) throws Exception { return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned, null); } + @Override + protected void createPartitionSublists() { + Set<String> fileLocations = groupScan.getFileSet(); + List<PartitionLocation> locations = new LinkedList<>(); + for (String file : fileLocations) { + locations.add(new ParquetPartitionLocation(file)); + } + locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE); + sublistsCreated = true; + } + + private GroupScan createNewGroupScan(List<String> newFiles, + String cacheFileRoot, + boolean wasAllPartitionsPruned, + MetadataContext metaContext) throws IOException { + + FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation(), cacheFileRoot, wasAllPartitionsPruned); + if (newSelection == null) { + return null; + } + newSelection.setMetaContext(metaContext); + return groupScan.clone(newSelection); + } + + private void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) { + String path = Path.getPathWithoutSchemeAndAuthority(new Path(file)).toString(); + TypeProtos.MajorType majorType = getVectorType(column, null); + TypeProtos.MinorType type = majorType.getMinorType(); + switch (type) { + case BIT: { + NullableBitVector bitVector = (NullableBitVector) v; + Boolean value = groupScan.getPartitionValue(path, column, Boolean.class); + if (value == null) { + bitVector.getMutator().setNull(index); + } else { + bitVector.getMutator().setSafe(index, value ? 1 : 0); + } + return; + } + case INT: { + NullableIntVector intVector = (NullableIntVector) v; + Integer value = groupScan.getPartitionValue(path, column, Integer.class); + if (value == null) { + intVector.getMutator().setNull(index); + } else { + intVector.getMutator().setSafe(index, value); + } + return; + } + case SMALLINT: { + NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v; + Integer value = groupScan.getPartitionValue(path, column, Integer.class); + if (value == null) { + smallIntVector.getMutator().setNull(index); + } else { + smallIntVector.getMutator().setSafe(index, value.shortValue()); + } + return; + } + case TINYINT: { + NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v; + Integer value = groupScan.getPartitionValue(path, column, Integer.class); + if (value == null) { + tinyIntVector.getMutator().setNull(index); + } else { + tinyIntVector.getMutator().setSafe(index, value.byteValue()); + } + return; + } + case UINT1: { + NullableUInt1Vector intVector = (NullableUInt1Vector) v; + Integer value = groupScan.getPartitionValue(path, column, Integer.class); + if (value == null) { + intVector.getMutator().setNull(index); + } else { + intVector.getMutator().setSafe(index, value.byteValue()); + } + return; + } + case UINT2: { + NullableUInt2Vector intVector = (NullableUInt2Vector) v; + Integer value = groupScan.getPartitionValue(path, column, Integer.class); + if (value == null) { + intVector.getMutator().setNull(index); + } else { + intVector.getMutator().setSafe(index, (char) value.shortValue()); + } + return; + } + case UINT4: { + NullableUInt4Vector intVector = (NullableUInt4Vector) v; + Integer value = groupScan.getPartitionValue(path, column, Integer.class); + if (value == null) { + intVector.getMutator().setNull(index); + } else { + intVector.getMutator().setSafe(index, value); + } + return; + } + case BIGINT: { + NullableBigIntVector bigIntVector = (NullableBigIntVector) v; + Long value = groupScan.getPartitionValue(path, column, Long.class); + if (value == null) { + bigIntVector.getMutator().setNull(index); + } else { + bigIntVector.getMutator().setSafe(index, value); + } + return; + } + case FLOAT4: { + NullableFloat4Vector float4Vector = (NullableFloat4Vector) v; + Float value = groupScan.getPartitionValue(path, column, Float.class); + if (value == null) { + float4Vector.getMutator().setNull(index); + } else { + float4Vector.getMutator().setSafe(index, value); + } + return; + } + case FLOAT8: { + NullableFloat8Vector float8Vector = (NullableFloat8Vector) v; + Double value = groupScan.getPartitionValue(path, column, Double.class); + if (value == null) { + float8Vector.getMutator().setNull(index); + } else { + float8Vector.getMutator().setSafe(index, value); + } + return; + } + case VARBINARY: { + NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v; + Object s = groupScan.getPartitionValue(path, column, Object.class); + byte[] bytes; + if (s == null) { + varBinaryVector.getMutator().setNull(index); + return; + } else { + bytes = getBytes(type, s); + } + varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length); + return; + } + case DECIMAL18: { + NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v; + Object s = groupScan.getPartitionValue(path, column, Object.class); + byte[] bytes; + if (s == null) { + decimalVector.getMutator().setNull(index); + return; + } else if (s instanceof Integer) { + long value = DecimalUtility.getBigDecimalFromPrimitiveTypes( + (Integer) s, + majorType.getScale(), + majorType.getPrecision()).longValue(); + decimalVector.getMutator().setSafe(index, value); + return; + } else if (s instanceof Long) { + long value = DecimalUtility.getBigDecimalFromPrimitiveTypes( + (Long) s, + majorType.getScale(), + majorType.getPrecision()).longValue(); + decimalVector.getMutator().setSafe(index, value); + return; + } else { + bytes = getBytes(type, s); + } + long value = DecimalUtility.getBigDecimalFromByteArray(bytes, 0, bytes.length, majorType.getScale()).longValue(); + decimalVector.getMutator().setSafe(index, value); + return; + } + case DATE: { + NullableDateVector dateVector = (NullableDateVector) v; + Integer value = groupScan.getPartitionValue(path, column, Integer.class); + if (value == null) { + dateVector.getMutator().setNull(index); + } else { + dateVector.getMutator().setSafe(index, value * (long) DateTimeConstants.MILLIS_PER_DAY); + } + return; + } + case TIME: { + NullableTimeVector timeVector = (NullableTimeVector) v; + Integer value = groupScan.getPartitionValue(path, column, Integer.class); + if (value == null) { + timeVector.getMutator().setNull(index); + } else { + timeVector.getMutator().setSafe(index, value); + } + return; + } + case TIMESTAMP: { + NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v; + Long value = groupScan.getPartitionValue(path, column, Long.class); + if (value == null) { + timeStampVector.getMutator().setNull(index); + } else { + timeStampVector.getMutator().setSafe(index, value); + } + return; + } + case VARCHAR: { + NullableVarCharVector varCharVector = (NullableVarCharVector) v; + Object s = groupScan.getPartitionValue(path, column, Object.class); + byte[] bytes; + if (s == null) { + varCharVector.getMutator().setNull(index); + return; + } else { + bytes = getBytes(type, s); + } + varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length); + return; + } + case INTERVAL: { + NullableIntervalVector intervalVector = (NullableIntervalVector) v; + Object s = groupScan.getPartitionValue(path, column, Object.class); + byte[] bytes; + if (s == null) { + intervalVector.getMutator().setNull(index); + return; + } else { + bytes = getBytes(type, s); + } + intervalVector.getMutator().setSafe(index, 1, + ParquetReaderUtility.getIntFromLEBytes(bytes, 0), + ParquetReaderUtility.getIntFromLEBytes(bytes, 4), + ParquetReaderUtility.getIntFromLEBytes(bytes, 8)); + return; + } + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + /** + * Returns the sequence of bytes received from {@code Object source}. + * + * @param type the column type + * @param source the source of the bytes sequence + * @return bytes sequence obtained from {@code Object source} + */ + private byte[] getBytes(TypeProtos.MinorType type, Object source) { + byte[] bytes; + if (source instanceof Binary) { + bytes = ((Binary) source).getBytes(); + } else if (source instanceof byte[]) { + bytes = (byte[]) source; + } else { + throw new UnsupportedOperationException("Unable to create column data for type: " + type); + } + return bytes; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java index f71c281..96cfa8a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java @@ -365,6 +365,7 @@ public enum PlannerPhase { // Ideally this should be done in logical planning, before join order planning is done. // Before we can make such change, we have to figure out how to adjust the selectivity // estimation of filter operator, after filter is pushed down to scan. + ParquetPushDownFilter.getFilterOnProject(optimizerRulesContext), ParquetPushDownFilter.getFilterOnScan(optimizerRulesContext) ) @@ -426,6 +427,9 @@ public enum PlannerPhase { ruleList.add(ValuesPrule.INSTANCE); ruleList.add(DirectScanPrule.INSTANCE); + ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_PROJECT); + ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_SCAN); + if (ps.isHashAggEnabled()) { ruleList.add(HashAggPrule.INSTANCE); } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java index 27f8c49..b7cdcfc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java @@ -32,7 +32,7 @@ import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl; import org.apache.drill.exec.planner.types.RelDataTypeHolder; -import org.apache.drill.exec.store.parquet.ParquetGroupScan; +import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan; import java.util.ArrayList; import java.util.Collection; @@ -73,7 +73,7 @@ public class DrillFilterItemStarReWriterRule { @Override public boolean matches(RelOptRuleCall call) { DrillScanRel scan = call.rel(1); - return scan.getGroupScan() instanceof ParquetGroupScan && super.matches(call); + return scan.getGroupScan() instanceof AbstractParquetGroupScan && super.matches(call); } @Override @@ -129,7 +129,7 @@ public class DrillFilterItemStarReWriterRule { @Override public boolean matches(RelOptRuleCall call) { DrillScanRel scan = call.rel(1); - return scan.getGroupScan() instanceof ParquetGroupScan && super.matches(call); + return scan.getGroupScan() instanceof AbstractParquetGroupScan && super.matches(call); } @Override @@ -149,7 +149,7 @@ public class DrillFilterItemStarReWriterRule { @Override public boolean matches(RelOptRuleCall call) { DrillScanRel scan = call.rel(2); - return scan.getGroupScan() instanceof ParquetGroupScan && super.matches(call); + return scan.getGroupScan() instanceof AbstractParquetGroupScan && super.matches(call); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java index 3153b9d..6e44383 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java @@ -19,10 +19,8 @@ package org.apache.drill.exec.planner.logical.partition; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.TableScan; import org.apache.drill.exec.ops.OptimizerRulesContext; -import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.ParquetPartitionDescriptor; import org.apache.drill.exec.planner.PartitionDescriptor; @@ -32,11 +30,11 @@ import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PrelUtil; -import org.apache.drill.exec.store.parquet.ParquetGroupScan; +import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan; public class ParquetPruneScanRule { - public static final RelOptRule getFilterOnProjectParquet(OptimizerRulesContext optimizerRulesContext) { + public static RelOptRule getFilterOnProjectParquet(OptimizerRulesContext optimizerRulesContext) { return new PruneScanRule( RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), "PruneScanRule:Filter_On_Project_Parquet", @@ -53,9 +51,9 @@ public class ParquetPruneScanRule { GroupScan groupScan = scan.getGroupScan(); // this rule is applicable only for parquet based partition pruning if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) { - return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown(); + return groupScan instanceof AbstractParquetGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown(); } else { - return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown(); + return groupScan instanceof AbstractParquetGroupScan && groupScan.supportsPartitionFilterPushdown(); } } @@ -69,7 +67,7 @@ public class ParquetPruneScanRule { }; } - public static final RelOptRule getFilterOnScanParquet(OptimizerRulesContext optimizerRulesContext) { + public static RelOptRule getFilterOnScanParquet(OptimizerRulesContext optimizerRulesContext) { return new PruneScanRule( RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)), "PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) { @@ -85,9 +83,9 @@ public class ParquetPruneScanRule { GroupScan groupScan = scan.getGroupScan(); // this rule is applicable only for parquet based partition pruning if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) { - return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown(); + return groupScan instanceof AbstractParquetGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown(); } else { - return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown(); + return groupScan instanceof AbstractParquetGroupScan && groupScan.supportsPartitionFilterPushdown(); } } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java index 5f679a4..7fa1794 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import com.google.common.base.Stopwatch; - import org.apache.calcite.adapter.enumerable.EnumerableTableScan; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.Project; @@ -147,8 +146,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) { final String pruningClassName = getClass().getName(); - logger.info("Beginning partition pruning, pruning class: {}", pruningClassName); - Stopwatch totalPruningTime = Stopwatch.createStarted(); + logger.debug("Beginning partition pruning, pruning class: {}", pruningClassName); + Stopwatch totalPruningTime = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel); @@ -191,30 +190,33 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { } if (partitionColumnBitSet.isEmpty()) { - logger.info("No partition columns are projected from the scan..continue. " + - "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + if (totalPruningTime != null) { + logger.debug("No partition columns are projected from the scan..continue. Total pruning elapsed time: {} ms", + totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + } setPruneStatus(metaContext, PruneStatus.NOT_PRUNED); return; } // stop watch to track how long we spend in different phases of pruning - Stopwatch miscTimer = Stopwatch.createUnstarted(); - - // track how long we spend building the filter tree - miscTimer.start(); + // first track how long we spend building the filter tree + Stopwatch miscTimer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder()); c.analyze(condition); RexNode pruneCondition = c.getFinalCondition(); BitSet referencedDirsBitSet = c.getReferencedDirs(); - logger.info("Total elapsed time to build and analyze filter tree: {} ms", - miscTimer.elapsed(TimeUnit.MILLISECONDS)); - miscTimer.reset(); + if (miscTimer != null) { + logger.debug("Total elapsed time to build and analyze filter tree: {} ms", miscTimer.elapsed(TimeUnit.MILLISECONDS)); + miscTimer.reset(); + } if (pruneCondition == null) { - logger.info("No conditions were found eligible for partition pruning." + - "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + if (totalPruningTime != null) { + logger.debug("No conditions were found eligible for partition pruning. Total pruning elapsed time: {} ms", + totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + } setPruneStatus(metaContext, PruneStatus.NOT_PRUNED); return; } @@ -251,15 +253,19 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { container.add(v); } - // track how long we spend populating partition column vectors - miscTimer.start(); + if (miscTimer != null) { + // track how long we spend populating partition column vectors + miscTimer.start(); + } // populate partition vectors. descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap); - logger.info("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}", - miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex); - miscTimer.reset(); + if (miscTimer != null) { + logger.debug("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}", + miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex); + miscTimer.reset(); + } // materialize the expression; only need to do this once if (batchIndex == 0) { @@ -267,8 +273,9 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { if (materializedExpr == null) { // continue without partition pruning; no need to log anything here since // materializePruneExpr logs it already - logger.info("Total pruning elapsed time: {} ms", - totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + if (totalPruningTime != null) { + logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + } setPruneStatus(metaContext, PruneStatus.NOT_PRUNED); return; } @@ -276,14 +283,18 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { output.allocateNew(partitions.size()); - // start the timer to evaluate how long we spend in the interpreter evaluation - miscTimer.start(); + if (miscTimer != null) { + // start the timer to evaluate how long we spend in the interpreter evaluation + miscTimer.start(); + } InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr); - logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {} with # of partitions : {}", - miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex, partitions.size()); - miscTimer.reset(); + if (miscTimer != null) { + logger.debug("Elapsed time in interpreter evaluation: {} ms within batchIndex: {} with # of partitions : {}", + miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex, partitions.size()); + miscTimer.reset(); + } int recordCount = 0; int qualifiedCount = 0; @@ -338,7 +349,9 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { batchIndex++; } catch (Exception e) { logger.warn("Exception while trying to prune partition.", e); - logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + if (totalPruningTime != null) { + logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + } setPruneStatus(metaContext, PruneStatus.NOT_PRUNED); return; // continue without partition pruning @@ -352,7 +365,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { try { if (newPartitions.size() == numTotal) { - logger.info("No partitions were eligible for pruning"); + logger.debug("No partitions were eligible for pruning"); return; } @@ -371,7 +384,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { // directories first and the non-composite partition location will still return // directories, not files. So, additional processing is done depending on this flag wasAllPartitionsPruned = true; - logger.info("All {} partitions were pruned; added back a single partition to allow creating a schema", numTotal); + logger.debug("All {} partitions were pruned; added back a single partition to allow creating a schema", numTotal); // set the cacheFileRoot appropriately if (firstLocation.isCompositePartition()) { @@ -379,7 +392,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { } } - logger.info("Pruned {} partitions down to {}", numTotal, newPartitions.size()); + logger.debug("Pruned {} partitions down to {}", numTotal, newPartitions.size()); List<RexNode> conjuncts = RelOptUtil.conjunctions(condition); List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition); @@ -439,7 +452,9 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { } catch (Exception e) { logger.warn("Exception while using the pruned partitions.", e); } finally { - logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + if (totalPruningTime != null) { + logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + } } } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java index 6bfceb4..f463e6d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java @@ -37,7 +37,7 @@ import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig; -import org.apache.drill.exec.store.parquet.Metadata; +import org.apache.drill.exec.store.parquet.metadata.Metadata; import org.apache.drill.exec.store.parquet.ParquetFormatConfig; import org.apache.drill.exec.work.foreman.ForemanSetupException; import org.apache.hadoop.fs.Path; http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java index fb86783..5ab67f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java @@ -17,24 +17,25 @@ */ package org.apache.drill.exec.store; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.commons.lang3.ArrayUtils; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.store.dfs.FileSelection; -import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.util.Utilities; import org.apache.hadoop.fs.Path; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.io.Files; public class ColumnExplorer { @@ -156,43 +157,74 @@ public class ColumnExplorer { } /** - * Compares selection root and actual file path to determine partition columns values. - * Adds implicit file columns according to columns list. + * Creates map with implicit columns where key is column name, value is columns actual value. + * This map contains partition and implicit file columns (if requested). + * Partition columns names are formed based in partition designator and value index. * - * @return map with columns names as keys and their values + * @param filePath file path, used to populate file implicit columns + * @param partitionValues list of partition values + * @param includeFileImplicitColumns if file implicit columns should be included into the result + * @return implicit columns map */ - public Map<String, String> populateImplicitColumns(FileWork work, String selectionRoot) { - return populateImplicitColumns(work.getPath(), selectionRoot); - } + public Map<String, String> populateImplicitColumns(String filePath, + List<String> partitionValues, + boolean includeFileImplicitColumns) { + Map<String, String> implicitValues = new LinkedHashMap<>(); - /** - * Compares selection root and actual file path to determine partition columns values. - * Adds implicit file columns according to columns list. - * - * @return map with columns names as keys and their values - */ - public Map<String, String> populateImplicitColumns(String filePath, String selectionRoot) { - Map<String, String> implicitValues = Maps.newLinkedHashMap(); - if (selectionRoot != null) { - String[] r = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString().split("/"); - Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath)); - String[] p = path.toString().split("/"); - if (p.length > r.length) { - String[] q = ArrayUtils.subarray(p, r.length, p.length - 1); - for (int a = 0; a < q.length; a++) { - if (isStarQuery || selectedPartitionColumns.contains(a)) { - implicitValues.put(partitionDesignator + a, q[a]); - } - } + for (int i = 0; i < partitionValues.size(); i++) { + if (isStarQuery || selectedPartitionColumns.contains(i)) { + implicitValues.put(partitionDesignator + i, partitionValues.get(i)); } - //add implicit file columns + } + + if (includeFileImplicitColumns) { + Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath)); for (Map.Entry<String, ImplicitFileColumns> entry : selectedImplicitColumns.entrySet()) { implicitValues.put(entry.getKey(), entry.getValue().getValue(path)); } } + return implicitValues; } + /** + * Compares root and file path to determine directories + * that are present in the file path but absent in root. + * Example: root - a/b/c, filePath - a/b/c/d/e/0_0_0.parquet, result - d/e. + * Stores different directory names in the list in successive order. + * + * + * @param filePath file path + * @param root root directory + * @return list of directory names + */ + public static List<String> listPartitionValues(String filePath, String root) { + if (filePath == null || root == null) { + return Collections.emptyList(); + } + + int rootDepth = new Path(root).depth(); + Path path = new Path(filePath); + int parentDepth = path.getParent().depth(); + + int diffCount = parentDepth - rootDepth; + + if (diffCount < 1) { + return Collections.emptyList(); + } + + String[] diffDirectoryNames = new String[diffCount]; + + // start filling in array from the end + for (int i = rootDepth; parentDepth > i; i++) { + path = path.getParent(); + // place in the end of array + diffDirectoryNames[parentDepth - i - 1] = path.getName(); + } + + return Arrays.asList(diffDirectoryNames); + } + public boolean isStarQuery() { return isStarQuery; } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java index fe0cae1..7cce2ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java @@ -20,18 +20,15 @@ package org.apache.drill.exec.store; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import com.google.common.base.Stopwatch; import org.apache.drill.common.concurrent.ExtendedLatch; import org.apache.drill.common.exceptions.UserException; import org.slf4j.Logger; -import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; /** @@ -115,7 +112,7 @@ public abstract class TimedRunnable<V> implements Runnable { * @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially. */ public static <V> List<V> run(final String activity, final Logger logger, final List<TimedRunnable<V>> runnables, int parallelism) throws IOException { - Stopwatch watch = Stopwatch.createStarted(); + Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; long timedRunnableStart=System.nanoTime(); if(runnables.size() == 1){ parallelism = 1; @@ -186,21 +183,22 @@ public abstract class TimedRunnable<V> implements Runnable { } } - if(logger.isInfoEnabled()){ + if (watch != null) { double avg = (sum/1000.0/1000.0)/(count*1.0d); double avgStart = (totalStart/1000.0)/(count*1.0d); - logger.info( + logger.debug( String.format("%s: Executed %d out of %d using %d threads. " + "Time: %dms total, %fms avg, %dms max.", activity, count, runnables.size(), parallelism, watch.elapsed(TimeUnit.MILLISECONDS), avg, max/1000/1000)); - logger.info( + logger.debug( String.format("%s: Executed %d out of %d using %d threads. " + "Earliest start: %f \u03BCs, Latest start: %f \u03BCs, Average start: %f \u03BCs .", activity, count, runnables.size(), parallelism, earliestStart/1000.0, latestStart/1000.0, avgStart)); + watch.stop(); } - if(excep != null) { + if (excep != null) { throw excep; } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java index 7edb327..f5bcced 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java @@ -95,7 +95,7 @@ public class FileSelection { final String cacheFileRoot, final boolean wasAllPartitionsPruned, final StatusType dirStatus) { this.statuses = statuses; this.files = files; - this.selectionRoot = Preconditions.checkNotNull(selectionRoot); + this.selectionRoot = selectionRoot; this.dirStatus = dirStatus; this.cacheFileRoot = cacheFileRoot; this.wasAllPartitionsPruned = wasAllPartitionsPruned; @@ -121,7 +121,7 @@ public class FileSelection { } public List<FileStatus> getStatuses(final DrillFileSystem fs) throws IOException { - Stopwatch timer = Stopwatch.createStarted(); + Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; if (statuses == null) { final List<FileStatus> newStatuses = Lists.newArrayList(); @@ -130,8 +130,11 @@ public class FileSelection { } statuses = newStatuses; } - logger.info("FileSelection.getStatuses() took {} ms, numFiles: {}", - timer.elapsed(TimeUnit.MILLISECONDS), statuses == null ? 0 : statuses.size()); + if (timer != null) { + logger.debug("FileSelection.getStatuses() took {} ms, numFiles: {}", + timer.elapsed(TimeUnit.MILLISECONDS), statuses == null ? 0 : statuses.size()); + timer.stop(); + } return statuses; } @@ -164,7 +167,7 @@ public class FileSelection { if (isExpandedFully()) { return this; } - Stopwatch timer = Stopwatch.createStarted(); + Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; List<FileStatus> statuses = getStatuses(fs); List<FileStatus> nonDirectories = Lists.newArrayList(); @@ -173,8 +176,10 @@ public class FileSelection { } final FileSelection fileSel = create(nonDirectories, null, selectionRoot); - logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}", - timer.elapsed(TimeUnit.MILLISECONDS), statuses.size()); + if (timer != null) { + logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}", timer.elapsed(TimeUnit.MILLISECONDS), statuses.size()); + timer.stop(); + } // fileSel will be null if we query an empty folder if (fileSel != null) { @@ -259,7 +264,7 @@ public class FileSelection { public static FileSelection create(final DrillFileSystem fs, final String parent, final String path, final boolean allowAccessOutsideWorkspace) throws IOException { - Stopwatch timer = Stopwatch.createStarted(); + Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; boolean hasWildcard = path.contains(WILD_CARD); final Path combined = new Path(parent, removeLeadingSlash(path)); @@ -271,7 +276,10 @@ public class FileSelection { return null; } final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().getPath()); - logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS)); + if (timer != null) { + logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS)); + timer.stop(); + } if (fileSel == null) { return null; } @@ -322,7 +330,7 @@ public class FileSelection { public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection, final String cacheFileRoot) { - Stopwatch timer = Stopwatch.createStarted(); + Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; final String root = selection.getSelectionRoot(); if (Strings.isNullOrEmpty(root)) { throw new DrillRuntimeException("Selection root is null or empty" + root); @@ -338,9 +346,7 @@ public class FileSelection { dirs.add(status.getPath().toString()); } } else { - for (String s : dirPaths) { - dirs.add(s); - } + dirs.addAll(dirPaths); } final Path rootPath = handleWildCard(root); @@ -349,7 +355,10 @@ public class FileSelection { final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); FileSelection fileSel = new FileSelection(null, dirs, path.toString(), cacheFileRoot, false); fileSel.setHadWildcard(selection.hadWildcard()); - logger.info("FileSelection.createFromDirectories() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS)); + if (timer != null) { + logger.debug("FileSelection.createFromDirectories() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS)); + timer.stop(); + } return fileSel; } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java index e02d841..15107ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java @@ -22,14 +22,14 @@ import org.apache.drill.exec.store.dfs.easy.FileWork; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork{ +public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork { private long start; private long length; @JsonCreator public ReadEntryFromHDFS(@JsonProperty("path") String path,@JsonProperty("start") long start, @JsonProperty("length") long length) { - this.path = path; + super(path); this.start = start; this.length = length; } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index 678569f..5af1091 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -147,10 +147,12 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements List<RecordReader> readers = new LinkedList<>(); List<Map<String, String>> implicitColumns = Lists.newArrayList(); Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); - for(FileWork work : scan.getWorkUnits()){ + boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null; + for (FileWork work : scan.getWorkUnits()){ RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName()); readers.add(recordReader); - Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work, scan.getSelectionRoot()); + List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot()); + Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, supportsFileImplicitColumns); implicitColumns.add(implicitValues); if (implicitValues.size() > mapWithMaxColumns.size()) { mapWithMaxColumns = implicitValues; http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java index f1da0f7..aa3f4ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java @@ -70,7 +70,7 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{ } @JsonProperty("filter") - public InfoSchemaFilter getFilter() { + public InfoSchemaFilter getSchemaFilter() { return filter; }