http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
index 3706ff2..64680c0 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
@@ -35,7 +35,7 @@ public class TestViewSupportOnHiveTables extends 
TestBaseViewSupport {
 
   @BeforeClass
   public static void generateHive() throws Exception{
-    hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher.getRootDir());
+    hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher);
     hiveTest.addHiveTestPlugin(getDrillbitContext().getStorage());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 78e5b39..f206999 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.nio.file.attribute.PosixFilePermission;
 import java.sql.Date;
 import java.sql.Timestamp;
@@ -31,6 +32,7 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Resources;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -52,9 +54,11 @@ public class HiveTestDataGenerator {
 
   private final String dbDir;
   private final String whDir;
+  private final BaseDirTestWatcher dirTestWatcher;
   private final Map<String, String> config;
 
-  public static synchronized HiveTestDataGenerator getInstance(File baseDir) 
throws Exception {
+  public static synchronized HiveTestDataGenerator 
getInstance(BaseDirTestWatcher dirTestWatcher) throws Exception {
+    File baseDir = dirTestWatcher.getRootDir();
     if (instance == null || !HiveTestDataGenerator.baseDir.equals(baseDir)) {
       HiveTestDataGenerator.baseDir = baseDir;
 
@@ -64,19 +68,20 @@ public class HiveTestDataGenerator {
       final String dbDir = dbDirFile.getAbsolutePath();
       final String whDir = whDirFile.getAbsolutePath();
 
-      instance = new HiveTestDataGenerator(dbDir, whDir);
+      instance = new HiveTestDataGenerator(dbDir, whDir, dirTestWatcher);
       instance.generateTestData();
     }
 
     return instance;
   }
 
-  private HiveTestDataGenerator(final String dbDir, final String whDir) {
+  private HiveTestDataGenerator(final String dbDir, final String whDir, final 
BaseDirTestWatcher dirTestWatcher) {
     this.dbDir = dbDir;
     this.whDir = whDir;
+    this.dirTestWatcher = dirTestWatcher;
 
     config = Maps.newHashMap();
-    config.put("hive.metastore.uris", "");
+    config.put(ConfVars.METASTOREURIS.toString(), "");
     config.put("javax.jdo.option.ConnectionURL", 
String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
     config.put("hive.metastore.warehouse.dir", whDir);
     config.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
@@ -128,7 +133,7 @@ public class HiveTestDataGenerator {
     try {
       Files.setPosixFilePermissions(dir.toPath(), perms);
     } catch (IOException e) {
-      new RuntimeException(e);
+      throw new RuntimeException(e);
     }
 
     return dir;
@@ -494,22 +499,6 @@ public class HiveTestDataGenerator {
     executeQuery(hiveDriver, "INSERT INTO TABLE kv_parquet PARTITION(part1) 
SELECT key, value, key FROM default.kv");
     executeQuery(hiveDriver, "ALTER TABLE kv_parquet ADD COLUMNS (newcol 
string)");
 
-    executeQuery(hiveDriver,
-        "CREATE TABLE countStar_Parquet (int_field INT) STORED AS parquet");
-
-    final int numOfRows = 200;
-    final StringBuffer sb = new StringBuffer();
-    sb.append("VALUES ");
-    for(int i = 0; i < numOfRows; ++i) {
-      if(i != 0) {
-        sb.append(",");
-      }
-      sb.append("(").append(i).append(")");
-    }
-
-    executeQuery(hiveDriver, "INSERT INTO TABLE countStar_Parquet \n" +
-        sb.toString());
-
     // Create a StorageHandler based table (DRILL-3739)
     executeQuery(hiveDriver, "CREATE TABLE kv_sh(key INT, value STRING) STORED 
BY " +
         "'org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler'");
@@ -551,9 +540,60 @@ public class HiveTestDataGenerator {
         Resources.getResource("simple.json") + "' into table 
default.simple_json";
     executeQuery(hiveDriver, loadData);
 
+    createTestDataForDrillNativeParquetReaderTests(hiveDriver);
+
     ss.close();
   }
 
+  private void createTestDataForDrillNativeParquetReaderTests(Driver 
hiveDriver) {
+    // Hive managed table that has data qualified for Drill native filter push 
down
+    executeQuery(hiveDriver, "create table kv_native(key int, sub_key int) 
stored as parquet");
+    // each insert is created in separate file
+    executeQuery(hiveDriver, "insert into table kv_native values (1, 1), (1, 
2)");
+    executeQuery(hiveDriver, "insert into table kv_native values (1, 3), (1, 
4)");
+    executeQuery(hiveDriver, "insert into table kv_native values (2, 5), (2, 
6)");
+    executeQuery(hiveDriver, "insert into table kv_native values (null, 9), 
(null, 10)");
+
+    // Hive external table which has three partitions
+
+    // copy external table with data from test resources
+    dirTestWatcher.copyResourceToRoot(Paths.get("external"));
+
+    File external = new File (baseDir, "external");
+    String tableLocation = new File(external, 
"kv_native_ext").toURI().getPath();
+
+    executeQuery(hiveDriver, String.format("create external table 
kv_native_ext(key int) " +
+        "partitioned by (part_key int) " +
+        "stored as parquet location '%s'",
+        tableLocation));
+
+    /*
+      DATA:
+      key, part_key
+      1, 1
+      2, 1
+      3, 2
+      4, 2
+     */
+
+    // add partitions
+
+    // partition in the same location as table
+    String firstPartition = new File(tableLocation, 
"part_key=1").toURI().getPath();
+    executeQuery(hiveDriver, String.format("alter table kv_native_ext add 
partition (part_key = '1') " +
+      "location '%s'", firstPartition));
+
+    // partition in different location with table
+    String secondPartition = new File(external, 
"part_key=2").toURI().getPath();
+    executeQuery(hiveDriver, String.format("alter table kv_native_ext add 
partition (part_key = '2') " +
+      "location '%s'", secondPartition));
+
+    // add empty partition
+    String thirdPartition = new 
File(dirTestWatcher.makeSubDir(Paths.get("empty_part")), 
"part_key=3").toURI().getPath();
+    executeQuery(hiveDriver, String.format("alter table kv_native_ext add 
partition (part_key = '3') " +
+      "location '%s'", thirdPartition));
+  }
+
   private File getTempFile() throws Exception {
     return java.nio.file.Files.createTempFile("drill-hive-test", 
".txt").toFile();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet
 
b/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet
new file mode 100755
index 0000000..f641402
Binary files /dev/null and 
b/contrib/storage-hive/core/src/test/resources/external/kv_native_ext/part_key=1/kv_1.parquet
 differ

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet 
b/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet
new file mode 100755
index 0000000..c34c267
Binary files /dev/null and 
b/contrib/storage-hive/core/src/test/resources/external/part_key=2/kv_2.parquet 
differ

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
index 211bd65..dfa41e8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
@@ -18,8 +18,11 @@
 package org.apache.drill.exec.ops;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -28,8 +31,6 @@ import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.hadoop.conf.Configuration;
 
-import com.google.common.base.Preconditions;
-
 import io.netty.buffer.DrillBuf;
 
 /**
@@ -46,8 +47,9 @@ public abstract class BaseOperatorContext implements 
OperatorContext {
   protected final BufferAllocator allocator;
   protected final PhysicalOperator popConfig;
   protected final BufferManager manager;
-  private DrillFileSystem fs;
+  private List<DrillFileSystem> fileSystems;
   private ControlsInjector injector;
+  private boolean allowCreatingFileSystem = true;
 
   public BaseOperatorContext(FragmentContext context, BufferAllocator 
allocator,
                PhysicalOperator popConfig) {
@@ -55,6 +57,7 @@ public abstract class BaseOperatorContext implements 
OperatorContext {
     this.allocator = allocator;
     this.popConfig = popConfig;
     this.manager = new BufferManagerImpl(allocator);
+    this.fileSystems = new ArrayList<>();
   }
 
   @Override
@@ -158,35 +161,46 @@ public abstract class BaseOperatorContext implements 
OperatorContext {
     } catch (RuntimeException e) {
       ex = ex == null ? e : ex;
     }
-    try {
-      if (fs != null) {
+
+    for (DrillFileSystem fs : fileSystems) {
+      try {
         fs.close();
-        fs = null;
-      }
-    } catch (IOException e) {
+      } catch (IOException e) {
       throw UserException.resourceError(e)
-        .addContext("Failed to close the Drill file system for " + getName())
-        .build(logger);
+          .addContext("Failed to close the Drill file system for " + getName())
+          .build(logger);
+      }
     }
+
     if (ex != null) {
       throw ex;
     }
   }
 
+  /**
+   * Creates DrillFileSystem that automatically tracks operator stats.
+   * Only one tracking and no non-tracking file system per operator context.
+   */
   @Override
   public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
-    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. 
Can only be called once per OperatorContext");
-    fs = new DrillFileSystem(conf, getStats());
+    Preconditions.checkState(allowCreatingFileSystem, "Only one tracking file 
system is allowed per Operator Context and it is already created.");
+    Preconditions.checkState(fileSystems.isEmpty(), "Non-tracking file 
system(-s) is(are) already created.");
+    DrillFileSystem fs = new DrillFileSystem(conf, getStats());
+    fileSystems.add(fs);
+    allowCreatingFileSystem = false;
     return fs;
   }
 
   /**
    * Creates a DrillFileSystem that does not automatically track operator 
stats.
+   * Multiple non-tracking file system are allowed.
    */
   @Override
   public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws 
IOException {
-    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. 
Can only be called once per OperatorContext");
-    fs = new DrillFileSystem(conf, null);
+    Preconditions.checkState(allowCreatingFileSystem, "Only one tracking file 
system is allowed per Operator Context and it is already created.");
+    DrillFileSystem fs = new DrillFileSystem(conf, null);
+    fileSystems.add(fs);
     return fs;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index b2ddf68..e89a1f9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -24,12 +24,16 @@ import java.util.List;
 
 import com.google.common.collect.Lists;
 
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.exec.server.options.OptionManager;
 
 public abstract class AbstractGroupScan extends AbstractBase implements 
GroupScan {
 
@@ -164,4 +168,14 @@ public abstract class AbstractGroupScan extends 
AbstractBase implements GroupSca
   public DistributionAffinity getDistributionAffinity() {
     return DistributionAffinity.SOFT;
   }
+
+  @Override
+  public LogicalExpression getFilter() {
+    return null;
+  }
+
+  @Override
+  public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities 
udfUtilities, FunctionImplementationRegistry functionImplementationRegistry, 
OptionManager optionManager) {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index fc63c77..33ab13d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -21,13 +21,17 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.server.options.OptionManager;
 
 /**
  * A GroupScan operator represents all data which will be scanned by a given 
physical
@@ -134,4 +138,10 @@ public interface GroupScan extends Scan, HasAffinity{
    */
   Collection<String> getFiles();
 
+  @JsonIgnore
+  LogicalExpression getFilter();
+
+  GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities 
udfUtilities,
+                        FunctionImplementationRegistry 
functionImplementationRegistry, OptionManager optionManager);
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
index 4404a98..f2974e1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.physical.base;
 
 public class ScanStats {
-//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ScanStats.class);
 
   public static final ScanStats TRIVIAL_TABLE = new 
ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 20, 1, 1);
 
@@ -30,7 +29,6 @@ public class ScanStats {
   private final GroupScanProperty property;
 
   public ScanStats(GroupScanProperty property, long recordCount, float 
cpuCost, float diskCost) {
-    super();
     this.recordCount = recordCount;
     this.cpuCost = cpuCost;
     this.diskCost = diskCost;
@@ -49,6 +47,11 @@ public class ScanStats {
     return diskCost;
   }
 
+  @Override
+  public String toString() {
+    return "ScanStats{" + "recordCount=" + recordCount + ", cpuCost=" + 
cpuCost + ", diskCost=" + diskCost + ", property=" + property + '}';
+  }
+
   /**
    * Return if GroupScan knows the exact row count in the result of getSize() 
call.
    * By default, groupscan does not know the exact row count, before it scans 
every rows.
@@ -60,7 +63,7 @@ public class ScanStats {
 
 
 
-  public static enum GroupScanProperty {
+  public enum GroupScanProperty {
     NO_EXACT_ROW_COUNT(false, false),
     EXACT_ROW_COUNT(true, true);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index a183e51..cbc530b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -21,7 +21,6 @@ import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.util.BitSets;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.physical.base.FileGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
@@ -29,31 +28,57 @@ import 
org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.MetadataContext;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableDecimal18Vector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableIntervalVector;
+import org.apache.drill.exec.vector.NullableSmallIntVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableTinyIntVector;
+import org.apache.drill.exec.vector.NullableUInt1Vector;
+import org.apache.drill.exec.vector.NullableUInt2Vector;
+import org.apache.drill.exec.vector.NullableUInt4Vector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.io.api.Binary;
+import org.joda.time.DateTimeConstants;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-
 /**
  * PartitionDescriptor that describes partitions based on column names instead 
of directory structure
  */
 public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
 
-  private final List<SchemaPath> partitionColumns;
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetPartitionDescriptor.class);
+
   private final DrillScanRel scanRel;
+  private final AbstractParquetGroupScan groupScan;
+  private final List<SchemaPath> partitionColumns;
 
   public ParquetPartitionDescriptor(PlannerSettings settings, DrillScanRel 
scanRel) {
-    ParquetGroupScan scan = (ParquetGroupScan) scanRel.getGroupScan();
-    this.partitionColumns = scan.getPartitionColumns();
     this.scanRel = scanRel;
+    assert scanRel.getGroupScan() instanceof AbstractParquetGroupScan;
+    this.groupScan = (AbstractParquetGroupScan) scanRel.getGroupScan();
+    this.partitionColumns = groupScan.getPartitionColumns();
   }
 
   @Override
@@ -81,15 +106,6 @@ public class ParquetPartitionDescriptor extends 
AbstractPartitionDescriptor {
     return partitionColumns.size();
   }
 
-  private GroupScan createNewGroupScan(List<String> newFiles, String 
cacheFileRoot,
-      boolean wasAllPartitionsPruned, MetadataContext metaContext) throws 
IOException {
-    final FileSelection newSelection = FileSelection.create(null, newFiles, 
getBaseTableLocation(),
-        cacheFileRoot, wasAllPartitionsPruned);
-    newSelection.setMetaContext(metaContext);
-    final FileGroupScan newScan = 
((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
-    return newScan;
-  }
-
   @Override
   public void populatePartitionVectors(ValueVector[] vectors, 
List<PartitionLocation> partitions,
                                        BitSet partitionColumnBitSet, 
Map<Integer, String> fieldNameMap) {
@@ -97,8 +113,7 @@ public class ParquetPartitionDescriptor extends 
AbstractPartitionDescriptor {
     for (PartitionLocation partitionLocation: partitions) {
       for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
         SchemaPath column = 
SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
-        ((ParquetGroupScan) 
scanRel.getGroupScan()).populatePruningVector(vectors[partitionColumnIndex], 
record, column,
-            partitionLocation.getEntirePartitionLocation());
+        populatePruningVector(vectors[partitionColumnIndex], record, column, 
partitionLocation.getEntirePartitionLocation());
       }
       record++;
     }
@@ -114,7 +129,7 @@ public class ParquetPartitionDescriptor extends 
AbstractPartitionDescriptor {
 
   @Override
   public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings 
plannerSettings) {
-    return ((ParquetGroupScan) 
scanRel.getGroupScan()).getTypeForColumn(column);
+    return groupScan.getTypeForColumn(column);
   }
 
   @Override
@@ -124,26 +139,22 @@ public class ParquetPartitionDescriptor extends 
AbstractPartitionDescriptor {
   }
 
   @Override
-  protected void createPartitionSublists() {
-    Set<String> fileLocations = ((ParquetGroupScan) 
scanRel.getGroupScan()).getFileSet();
-    List<PartitionLocation> locations = new LinkedList<>();
-    for (String file: fileLocations) {
-      locations.add(new ParquetPartitionLocation(file));
-    }
-    locationSuperList = Lists.partition(locations, 
PartitionDescriptor.PARTITION_BATCH_SIZE);
-    sublistsCreated = true;
-  }
-
-  @Override
-  public TableScan createTableScan(List<PartitionLocation> 
newPartitionLocation, String cacheFileRoot,
-      boolean wasAllPartitionsPruned, MetadataContext metaContext) throws 
Exception {
-    List<String> newFiles = Lists.newArrayList();
+  public TableScan createTableScan(List<PartitionLocation> 
newPartitionLocation,
+                                   String cacheFileRoot,
+                                   boolean wasAllPartitionsPruned,
+                                   MetadataContext metaContext) throws 
Exception {
+    List<String> newFiles = new ArrayList<>();
     for (final PartitionLocation location : newPartitionLocation) {
       newFiles.add(location.getEntirePartitionLocation());
     }
 
     final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot, 
wasAllPartitionsPruned, metaContext);
 
+    if (newGroupScan == null) {
+      logger.warn("Unable to create new group scan, returning original table 
scan.");
+      return scanRel;
+    }
+
     return new DrillScanRel(scanRel.getCluster(),
         scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
         scanRel.getTable(),
@@ -154,9 +165,261 @@ public class ParquetPartitionDescriptor extends 
AbstractPartitionDescriptor {
   }
 
   @Override
-  public TableScan createTableScan(List<PartitionLocation> 
newPartitionLocation,
-      boolean wasAllPartitionsPruned) throws Exception {
+  public TableScan createTableScan(List<PartitionLocation> 
newPartitionLocation, boolean wasAllPartitionsPruned) throws Exception {
     return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned, 
null);
   }
 
+  @Override
+  protected void createPartitionSublists() {
+    Set<String> fileLocations = groupScan.getFileSet();
+    List<PartitionLocation> locations = new LinkedList<>();
+    for (String file : fileLocations) {
+      locations.add(new ParquetPartitionLocation(file));
+    }
+    locationSuperList = Lists.partition(locations, 
PartitionDescriptor.PARTITION_BATCH_SIZE);
+    sublistsCreated = true;
+  }
+
+  private GroupScan createNewGroupScan(List<String> newFiles,
+                                       String cacheFileRoot,
+                                       boolean wasAllPartitionsPruned,
+                                       MetadataContext metaContext) throws 
IOException {
+
+    FileSelection newSelection = FileSelection.create(null, newFiles, 
getBaseTableLocation(), cacheFileRoot, wasAllPartitionsPruned);
+    if (newSelection == null) {
+      return null;
+    }
+    newSelection.setMetaContext(metaContext);
+    return groupScan.clone(newSelection);
+  }
+
+  private void populatePruningVector(ValueVector v, int index, SchemaPath 
column, String file) {
+    String path = Path.getPathWithoutSchemeAndAuthority(new 
Path(file)).toString();
+    TypeProtos.MajorType majorType = getVectorType(column, null);
+    TypeProtos.MinorType type = majorType.getMinorType();
+    switch (type) {
+      case BIT: {
+        NullableBitVector bitVector = (NullableBitVector) v;
+        Boolean value = groupScan.getPartitionValue(path, column, 
Boolean.class);
+        if (value == null) {
+          bitVector.getMutator().setNull(index);
+        } else {
+          bitVector.getMutator().setSafe(index, value ? 1 : 0);
+        }
+        return;
+      }
+      case INT: {
+        NullableIntVector intVector = (NullableIntVector) v;
+        Integer value = groupScan.getPartitionValue(path, column, 
Integer.class);
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, value);
+        }
+        return;
+      }
+      case SMALLINT: {
+        NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v;
+        Integer value = groupScan.getPartitionValue(path, column, 
Integer.class);
+        if (value == null) {
+          smallIntVector.getMutator().setNull(index);
+        } else {
+          smallIntVector.getMutator().setSafe(index, value.shortValue());
+        }
+        return;
+      }
+      case TINYINT: {
+        NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v;
+        Integer value = groupScan.getPartitionValue(path, column, 
Integer.class);
+        if (value == null) {
+          tinyIntVector.getMutator().setNull(index);
+        } else {
+          tinyIntVector.getMutator().setSafe(index, value.byteValue());
+        }
+        return;
+      }
+      case UINT1: {
+        NullableUInt1Vector intVector = (NullableUInt1Vector) v;
+        Integer value = groupScan.getPartitionValue(path, column, 
Integer.class);
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, value.byteValue());
+        }
+        return;
+      }
+      case UINT2: {
+        NullableUInt2Vector intVector = (NullableUInt2Vector) v;
+        Integer value = groupScan.getPartitionValue(path, column, 
Integer.class);
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, (char) value.shortValue());
+        }
+        return;
+      }
+      case UINT4: {
+        NullableUInt4Vector intVector = (NullableUInt4Vector) v;
+        Integer value = groupScan.getPartitionValue(path, column, 
Integer.class);
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, value);
+        }
+        return;
+      }
+      case BIGINT: {
+        NullableBigIntVector bigIntVector = (NullableBigIntVector) v;
+        Long value = groupScan.getPartitionValue(path, column, Long.class);
+        if (value == null) {
+          bigIntVector.getMutator().setNull(index);
+        } else {
+          bigIntVector.getMutator().setSafe(index, value);
+        }
+        return;
+      }
+      case FLOAT4: {
+        NullableFloat4Vector float4Vector = (NullableFloat4Vector) v;
+        Float value = groupScan.getPartitionValue(path, column, Float.class);
+        if (value == null) {
+          float4Vector.getMutator().setNull(index);
+        } else {
+          float4Vector.getMutator().setSafe(index, value);
+        }
+        return;
+      }
+      case FLOAT8: {
+        NullableFloat8Vector float8Vector = (NullableFloat8Vector) v;
+        Double value = groupScan.getPartitionValue(path, column, Double.class);
+        if (value == null) {
+          float8Vector.getMutator().setNull(index);
+        } else {
+          float8Vector.getMutator().setSafe(index, value);
+        }
+        return;
+      }
+      case VARBINARY: {
+        NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v;
+        Object s = groupScan.getPartitionValue(path, column, Object.class);
+        byte[] bytes;
+        if (s == null) {
+          varBinaryVector.getMutator().setNull(index);
+          return;
+        } else {
+          bytes = getBytes(type, s);
+        }
+        varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length);
+        return;
+      }
+      case DECIMAL18: {
+        NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v;
+        Object s = groupScan.getPartitionValue(path, column, Object.class);
+        byte[] bytes;
+        if (s == null) {
+          decimalVector.getMutator().setNull(index);
+          return;
+        } else if (s instanceof Integer) {
+          long value = DecimalUtility.getBigDecimalFromPrimitiveTypes(
+              (Integer) s,
+              majorType.getScale(),
+              majorType.getPrecision()).longValue();
+          decimalVector.getMutator().setSafe(index, value);
+          return;
+        } else if (s instanceof Long) {
+          long value = DecimalUtility.getBigDecimalFromPrimitiveTypes(
+              (Long) s,
+              majorType.getScale(),
+              majorType.getPrecision()).longValue();
+          decimalVector.getMutator().setSafe(index, value);
+          return;
+        } else {
+          bytes = getBytes(type, s);
+        }
+        long value = DecimalUtility.getBigDecimalFromByteArray(bytes, 0, 
bytes.length, majorType.getScale()).longValue();
+        decimalVector.getMutator().setSafe(index, value);
+        return;
+      }
+      case DATE: {
+        NullableDateVector dateVector = (NullableDateVector) v;
+        Integer value = groupScan.getPartitionValue(path, column, 
Integer.class);
+        if (value == null) {
+          dateVector.getMutator().setNull(index);
+        } else {
+          dateVector.getMutator().setSafe(index, value * (long) 
DateTimeConstants.MILLIS_PER_DAY);
+        }
+        return;
+      }
+      case TIME: {
+        NullableTimeVector timeVector = (NullableTimeVector) v;
+        Integer value = groupScan.getPartitionValue(path, column, 
Integer.class);
+        if (value == null) {
+          timeVector.getMutator().setNull(index);
+        } else {
+          timeVector.getMutator().setSafe(index, value);
+        }
+        return;
+      }
+      case TIMESTAMP: {
+        NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v;
+        Long value = groupScan.getPartitionValue(path, column, Long.class);
+        if (value == null) {
+          timeStampVector.getMutator().setNull(index);
+        } else {
+          timeStampVector.getMutator().setSafe(index, value);
+        }
+        return;
+      }
+      case VARCHAR: {
+        NullableVarCharVector varCharVector = (NullableVarCharVector) v;
+        Object s = groupScan.getPartitionValue(path, column, Object.class);
+        byte[] bytes;
+        if (s == null) {
+          varCharVector.getMutator().setNull(index);
+          return;
+        } else {
+          bytes = getBytes(type, s);
+        }
+        varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length);
+        return;
+      }
+      case INTERVAL: {
+        NullableIntervalVector intervalVector = (NullableIntervalVector) v;
+        Object s = groupScan.getPartitionValue(path, column, Object.class);
+        byte[] bytes;
+        if (s == null) {
+          intervalVector.getMutator().setNull(index);
+          return;
+        } else {
+          bytes = getBytes(type, s);
+        }
+        intervalVector.getMutator().setSafe(index, 1,
+            ParquetReaderUtility.getIntFromLEBytes(bytes, 0),
+            ParquetReaderUtility.getIntFromLEBytes(bytes, 4),
+            ParquetReaderUtility.getIntFromLEBytes(bytes, 8));
+        return;
+      }
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + type);
+    }
+  }
+
+  /**
+   * Returns the sequence of bytes received from {@code Object source}.
+   *
+   * @param type the column type
+   * @param source the source of the bytes sequence
+   * @return bytes sequence obtained from {@code Object source}
+   */
+  private byte[] getBytes(TypeProtos.MinorType type, Object source) {
+    byte[] bytes;
+    if (source instanceof Binary) {
+      bytes = ((Binary) source).getBytes();
+    } else if (source instanceof byte[]) {
+      bytes = (byte[]) source;
+    } else {
+      throw new UnsupportedOperationException("Unable to create column data 
for type: " + type);
+    }
+    return bytes;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index f71c281..96cfa8a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -365,6 +365,7 @@ public enum PlannerPhase {
             // Ideally this should be done in logical planning, before join 
order planning is done.
             // Before we can make such change, we have to figure out how to 
adjust the selectivity
             // estimation of filter operator, after filter is pushed down to 
scan.
+
             ParquetPushDownFilter.getFilterOnProject(optimizerRulesContext),
             ParquetPushDownFilter.getFilterOnScan(optimizerRulesContext)
         )
@@ -426,6 +427,9 @@ public enum PlannerPhase {
     ruleList.add(ValuesPrule.INSTANCE);
     ruleList.add(DirectScanPrule.INSTANCE);
 
+    ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_PROJECT);
+    ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_SCAN);
+
     if (ps.isHashAggEnabled()) {
       ruleList.add(HashAggPrule.INSTANCE);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java
index 27f8c49..b7cdcfc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java
@@ -32,7 +32,7 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl;
 import org.apache.drill.exec.planner.types.RelDataTypeHolder;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -73,7 +73,7 @@ public class DrillFilterItemStarReWriterRule {
     @Override
     public boolean matches(RelOptRuleCall call) {
       DrillScanRel scan = call.rel(1);
-      return scan.getGroupScan() instanceof ParquetGroupScan && 
super.matches(call);
+      return scan.getGroupScan() instanceof AbstractParquetGroupScan && 
super.matches(call);
     }
 
     @Override
@@ -129,7 +129,7 @@ public class DrillFilterItemStarReWriterRule {
     @Override
     public boolean matches(RelOptRuleCall call) {
       DrillScanRel scan = call.rel(1);
-      return scan.getGroupScan() instanceof ParquetGroupScan && 
super.matches(call);
+      return scan.getGroupScan() instanceof AbstractParquetGroupScan && 
super.matches(call);
     }
 
     @Override
@@ -149,7 +149,7 @@ public class DrillFilterItemStarReWriterRule {
     @Override
     public boolean matches(RelOptRuleCall call) {
       DrillScanRel scan = call.rel(2);
-      return scan.getGroupScan() instanceof ParquetGroupScan && 
super.matches(call);
+      return scan.getGroupScan() instanceof AbstractParquetGroupScan && 
super.matches(call);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
index 3153b9d..6e44383 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
@@ -19,10 +19,8 @@ package org.apache.drill.exec.planner.logical.partition;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
-import org.apache.drill.exec.physical.base.FileGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.ParquetPartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionDescriptor;
@@ -32,11 +30,11 @@ import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
 
 public class ParquetPruneScanRule {
 
-  public static final RelOptRule 
getFilterOnProjectParquet(OptimizerRulesContext optimizerRulesContext) {
+  public static RelOptRule getFilterOnProjectParquet(OptimizerRulesContext 
optimizerRulesContext) {
     return new PruneScanRule(
         RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
         "PruneScanRule:Filter_On_Project_Parquet",
@@ -53,9 +51,9 @@ public class ParquetPruneScanRule {
         GroupScan groupScan = scan.getGroupScan();
         // this rule is applicable only for parquet based partition pruning
         if 
(PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled())
 {
-          return groupScan instanceof ParquetGroupScan && 
groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
+          return groupScan instanceof AbstractParquetGroupScan && 
groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
         } else {
-          return groupScan instanceof ParquetGroupScan && 
groupScan.supportsPartitionFilterPushdown();
+          return groupScan instanceof AbstractParquetGroupScan && 
groupScan.supportsPartitionFilterPushdown();
         }
       }
 
@@ -69,7 +67,7 @@ public class ParquetPruneScanRule {
     };
   }
 
-  public static final RelOptRule getFilterOnScanParquet(OptimizerRulesContext 
optimizerRulesContext) {
+  public static RelOptRule getFilterOnScanParquet(OptimizerRulesContext 
optimizerRulesContext) {
     return new PruneScanRule(
         RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.any(DrillScanRel.class)),
         "PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) {
@@ -85,9 +83,9 @@ public class ParquetPruneScanRule {
         GroupScan groupScan = scan.getGroupScan();
         // this rule is applicable only for parquet based partition pruning
         if 
(PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled())
 {
-          return groupScan instanceof ParquetGroupScan && 
groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
+          return groupScan instanceof AbstractParquetGroupScan && 
groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
         } else {
-          return groupScan instanceof ParquetGroupScan && 
groupScan.supportsPartitionFilterPushdown();
+          return groupScan instanceof AbstractParquetGroupScan && 
groupScan.supportsPartitionFilterPushdown();
         }
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 5f679a4..7fa1794 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Stopwatch;
-
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
@@ -147,8 +146,8 @@ public abstract class PruneScanRule extends 
StoragePluginOptimizerRule {
   protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project 
projectRel, TableScan scanRel) {
 
     final String pruningClassName = getClass().getName();
-    logger.info("Beginning partition pruning, pruning class: {}", 
pruningClassName);
-    Stopwatch totalPruningTime = Stopwatch.createStarted();
+    logger.debug("Beginning partition pruning, pruning class: {}", 
pruningClassName);
+    Stopwatch totalPruningTime = logger.isDebugEnabled() ? 
Stopwatch.createStarted() : null;
 
     final PlannerSettings settings = 
PrelUtil.getPlannerSettings(call.getPlanner());
     PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
@@ -191,30 +190,33 @@ public abstract class PruneScanRule extends 
StoragePluginOptimizerRule {
     }
 
     if (partitionColumnBitSet.isEmpty()) {
-      logger.info("No partition columns are projected from the scan..continue. 
" +
-          "Total pruning elapsed time: {} ms", 
totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+      if (totalPruningTime != null) {
+        logger.debug("No partition columns are projected from the 
scan..continue. Total pruning elapsed time: {} ms",
+            totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+      }
       setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
       return;
     }
 
     // stop watch to track how long we spend in different phases of pruning
-    Stopwatch miscTimer = Stopwatch.createUnstarted();
-
-    // track how long we spend building the filter tree
-    miscTimer.start();
+    // first track how long we spend building the filter tree
+    Stopwatch miscTimer = logger.isDebugEnabled() ? Stopwatch.createStarted() 
: null;
 
     FindPartitionConditions c = new FindPartitionConditions(columnBitset, 
filterRel.getCluster().getRexBuilder());
     c.analyze(condition);
     RexNode pruneCondition = c.getFinalCondition();
     BitSet referencedDirsBitSet = c.getReferencedDirs();
 
-    logger.info("Total elapsed time to build and analyze filter tree: {} ms",
-        miscTimer.elapsed(TimeUnit.MILLISECONDS));
-    miscTimer.reset();
+    if (miscTimer != null) {
+      logger.debug("Total elapsed time to build and analyze filter tree: {} 
ms", miscTimer.elapsed(TimeUnit.MILLISECONDS));
+      miscTimer.reset();
+    }
 
     if (pruneCondition == null) {
-      logger.info("No conditions were found eligible for partition pruning." +
-          "Total pruning elapsed time: {} ms", 
totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+      if (totalPruningTime != null) {
+        logger.debug("No conditions were found eligible for partition pruning. 
Total pruning elapsed time: {} ms",
+            totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+      }
       setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
       return;
     }
@@ -251,15 +253,19 @@ public abstract class PruneScanRule extends 
StoragePluginOptimizerRule {
           container.add(v);
         }
 
-        // track how long we spend populating partition column vectors
-        miscTimer.start();
+        if (miscTimer != null) {
+          // track how long we spend populating partition column vectors
+          miscTimer.start();
+        }
 
         // populate partition vectors.
         descriptor.populatePartitionVectors(vectors, partitions, 
partitionColumnBitSet, fieldNameMap);
 
-        logger.info("Elapsed time to populate partitioning column vectors: {} 
ms within batchIndex: {}",
-            miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
-        miscTimer.reset();
+        if (miscTimer != null) {
+          logger.debug("Elapsed time to populate partitioning column vectors: 
{} ms within batchIndex: {}",
+              miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
+          miscTimer.reset();
+        }
 
         // materialize the expression; only need to do this once
         if (batchIndex == 0) {
@@ -267,8 +273,9 @@ public abstract class PruneScanRule extends 
StoragePluginOptimizerRule {
           if (materializedExpr == null) {
             // continue without partition pruning; no need to log anything 
here since
             // materializePruneExpr logs it already
-            logger.info("Total pruning elapsed time: {} ms",
-                totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+            if (totalPruningTime != null) {
+              logger.debug("Total pruning elapsed time: {} ms", 
totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+            }
             setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
             return;
           }
@@ -276,14 +283,18 @@ public abstract class PruneScanRule extends 
StoragePluginOptimizerRule {
 
         output.allocateNew(partitions.size());
 
-        // start the timer to evaluate how long we spend in the interpreter 
evaluation
-        miscTimer.start();
+        if (miscTimer != null) {
+          // start the timer to evaluate how long we spend in the interpreter 
evaluation
+          miscTimer.start();
+        }
 
         InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, 
container, output, materializedExpr);
 
-        logger.info("Elapsed time in interpreter evaluation: {} ms within 
batchIndex: {} with # of partitions : {}",
-            miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex, 
partitions.size());
-        miscTimer.reset();
+        if (miscTimer != null) {
+          logger.debug("Elapsed time in interpreter evaluation: {} ms within 
batchIndex: {} with # of partitions : {}",
+              miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex, 
partitions.size());
+          miscTimer.reset();
+        }
 
         int recordCount = 0;
         int qualifiedCount = 0;
@@ -338,7 +349,9 @@ public abstract class PruneScanRule extends 
StoragePluginOptimizerRule {
         batchIndex++;
       } catch (Exception e) {
         logger.warn("Exception while trying to prune partition.", e);
-        logger.info("Total pruning elapsed time: {} ms", 
totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+        if (totalPruningTime != null) {
+          logger.debug("Total pruning elapsed time: {} ms", 
totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+        }
 
         setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
         return; // continue without partition pruning
@@ -352,7 +365,7 @@ public abstract class PruneScanRule extends 
StoragePluginOptimizerRule {
 
     try {
       if (newPartitions.size() == numTotal) {
-        logger.info("No partitions were eligible for pruning");
+        logger.debug("No partitions were eligible for pruning");
         return;
       }
 
@@ -371,7 +384,7 @@ public abstract class PruneScanRule extends 
StoragePluginOptimizerRule {
         // directories first and the non-composite partition location will 
still return
         // directories, not files.  So, additional processing is done 
depending on this flag
         wasAllPartitionsPruned = true;
-        logger.info("All {} partitions were pruned; added back a single 
partition to allow creating a schema", numTotal);
+        logger.debug("All {} partitions were pruned; added back a single 
partition to allow creating a schema", numTotal);
 
         // set the cacheFileRoot appropriately
         if (firstLocation.isCompositePartition()) {
@@ -379,7 +392,7 @@ public abstract class PruneScanRule extends 
StoragePluginOptimizerRule {
         }
       }
 
-      logger.info("Pruned {} partitions down to {}", numTotal, 
newPartitions.size());
+      logger.debug("Pruned {} partitions down to {}", numTotal, 
newPartitions.size());
 
       List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
       List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
@@ -439,7 +452,9 @@ public abstract class PruneScanRule extends 
StoragePluginOptimizerRule {
     } catch (Exception e) {
       logger.warn("Exception while using the pruned partitions.", e);
     } finally {
-      logger.info("Total pruning elapsed time: {} ms", 
totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+      if (totalPruningTime != null) {
+        logger.debug("Total pruning elapsed time: {} ms", 
totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
index 6bfceb4..f463e6d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -37,7 +37,7 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
-import org.apache.drill.exec.store.parquet.Metadata;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index fb86783..5ab67f1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -17,24 +17,25 @@
  */
 package org.apache.drill.exec.store;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 
 public class ColumnExplorer {
@@ -156,43 +157,74 @@ public class ColumnExplorer {
   }
 
   /**
-   * Compares selection root and actual file path to determine partition 
columns values.
-   * Adds implicit file columns according to columns list.
+   * Creates map with implicit columns where key is column name, value is 
columns actual value.
+   * This map contains partition and implicit file columns (if requested).
+   * Partition columns names are formed based in partition designator and 
value index.
    *
-   * @return map with columns names as keys and their values
+   * @param filePath file path, used to populate file implicit columns
+   * @param partitionValues list of partition values
+   * @param includeFileImplicitColumns if file implicit columns should be 
included into the result
+   * @return implicit columns map
    */
-  public Map<String, String> populateImplicitColumns(FileWork work, String 
selectionRoot) {
-    return populateImplicitColumns(work.getPath(), selectionRoot);
-  }
+  public Map<String, String> populateImplicitColumns(String filePath,
+                                                     List<String> 
partitionValues,
+                                                     boolean 
includeFileImplicitColumns) {
+    Map<String, String> implicitValues = new LinkedHashMap<>();
 
-  /**
-   * Compares selection root and actual file path to determine partition 
columns values.
-   * Adds implicit file columns according to columns list.
-   *
-   * @return map with columns names as keys and their values
-   */
-  public Map<String, String> populateImplicitColumns(String filePath, String 
selectionRoot) {
-    Map<String, String> implicitValues = Maps.newLinkedHashMap();
-    if (selectionRoot != null) {
-      String[] r = Path.getPathWithoutSchemeAndAuthority(new 
Path(selectionRoot)).toString().split("/");
-      Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
-      String[] p = path.toString().split("/");
-      if (p.length > r.length) {
-        String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
-        for (int a = 0; a < q.length; a++) {
-          if (isStarQuery || selectedPartitionColumns.contains(a)) {
-            implicitValues.put(partitionDesignator + a, q[a]);
-          }
-        }
+    for (int i = 0; i < partitionValues.size(); i++) {
+      if (isStarQuery || selectedPartitionColumns.contains(i)) {
+        implicitValues.put(partitionDesignator + i, partitionValues.get(i));
       }
-      //add implicit file columns
+    }
+
+    if (includeFileImplicitColumns) {
+      Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
       for (Map.Entry<String, ImplicitFileColumns> entry : 
selectedImplicitColumns.entrySet()) {
         implicitValues.put(entry.getKey(), entry.getValue().getValue(path));
       }
     }
+
     return implicitValues;
   }
 
+  /**
+   * Compares root and file path to determine directories
+   * that are present in the file path but absent in root.
+   * Example: root - a/b/c, filePath - a/b/c/d/e/0_0_0.parquet, result - d/e.
+   * Stores different directory names in the list in successive order.
+   *
+   *
+   * @param filePath file path
+   * @param root root directory
+   * @return list of directory names
+   */
+  public static List<String> listPartitionValues(String filePath, String root) 
{
+    if (filePath == null || root == null) {
+      return Collections.emptyList();
+    }
+
+    int rootDepth = new Path(root).depth();
+    Path path = new Path(filePath);
+    int parentDepth = path.getParent().depth();
+
+    int diffCount = parentDepth - rootDepth;
+
+    if (diffCount < 1) {
+      return Collections.emptyList();
+    }
+
+    String[] diffDirectoryNames = new String[diffCount];
+
+    // start filling in array from the end
+    for (int i = rootDepth; parentDepth > i; i++) {
+      path = path.getParent();
+      // place in the end of array
+      diffDirectoryNames[parentDepth - i - 1] = path.getName();
+    }
+
+    return Arrays.asList(diffDirectoryNames);
+  }
+
   public boolean isStarQuery() {
     return isStarQuery;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
index fe0cae1..7cce2ad 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
@@ -20,18 +20,15 @@ package org.apache.drill.exec.store;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
+import com.google.common.base.Stopwatch;
 import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.exceptions.UserException;
 import org.slf4j.Logger;
 
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 
 /**
@@ -115,7 +112,7 @@ public abstract class TimedRunnable<V> implements Runnable {
    * @throws IOException All exceptions are coerced to IOException since this 
was build for storage system tasks initially.
    */
   public static <V> List<V> run(final String activity, final Logger logger, 
final List<TimedRunnable<V>> runnables, int parallelism) throws IOException {
-    Stopwatch watch = Stopwatch.createStarted();
+    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
     long timedRunnableStart=System.nanoTime();
     if(runnables.size() == 1){
       parallelism = 1;
@@ -186,21 +183,22 @@ public abstract class TimedRunnable<V> implements 
Runnable {
       }
     }
 
-    if(logger.isInfoEnabled()){
+    if (watch != null) {
       double avg = (sum/1000.0/1000.0)/(count*1.0d);
       double avgStart = (totalStart/1000.0)/(count*1.0d);
 
-      logger.info(
+      logger.debug(
           String.format("%s: Executed %d out of %d using %d threads. "
               + "Time: %dms total, %fms avg, %dms max.",
               activity, count, runnables.size(), parallelism, 
watch.elapsed(TimeUnit.MILLISECONDS), avg, max/1000/1000));
-      logger.info(
+      logger.debug(
               String.format("%s: Executed %d out of %d using %d threads. "
                               + "Earliest start: %f \u03BCs, Latest start: %f 
\u03BCs, Average start: %f \u03BCs .",
                       activity, count, runnables.size(), parallelism, 
earliestStart/1000.0, latestStart/1000.0, avgStart));
+      watch.stop();
     }
 
-    if(excep != null) {
+    if (excep != null) {
       throw excep;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 7edb327..f5bcced 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -95,7 +95,7 @@ public class FileSelection {
       final String cacheFileRoot, final boolean wasAllPartitionsPruned, final 
StatusType dirStatus) {
     this.statuses = statuses;
     this.files = files;
-    this.selectionRoot = Preconditions.checkNotNull(selectionRoot);
+    this.selectionRoot = selectionRoot;
     this.dirStatus = dirStatus;
     this.cacheFileRoot = cacheFileRoot;
     this.wasAllPartitionsPruned = wasAllPartitionsPruned;
@@ -121,7 +121,7 @@ public class FileSelection {
   }
 
   public List<FileStatus> getStatuses(final DrillFileSystem fs) throws 
IOException {
-    Stopwatch timer = Stopwatch.createStarted();
+    Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
 
     if (statuses == null)  {
       final List<FileStatus> newStatuses = Lists.newArrayList();
@@ -130,8 +130,11 @@ public class FileSelection {
       }
       statuses = newStatuses;
     }
-    logger.info("FileSelection.getStatuses() took {} ms, numFiles: {}",
-        timer.elapsed(TimeUnit.MILLISECONDS), statuses == null ? 0 : 
statuses.size());
+    if (timer != null) {
+      logger.debug("FileSelection.getStatuses() took {} ms, numFiles: {}",
+          timer.elapsed(TimeUnit.MILLISECONDS), statuses == null ? 0 : 
statuses.size());
+      timer.stop();
+    }
 
     return statuses;
   }
@@ -164,7 +167,7 @@ public class FileSelection {
     if (isExpandedFully()) {
       return this;
     }
-    Stopwatch timer = Stopwatch.createStarted();
+    Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
     List<FileStatus> statuses = getStatuses(fs);
 
     List<FileStatus> nonDirectories = Lists.newArrayList();
@@ -173,8 +176,10 @@ public class FileSelection {
     }
 
     final FileSelection fileSel = create(nonDirectories, null, selectionRoot);
-    logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}",
-        timer.elapsed(TimeUnit.MILLISECONDS), statuses.size());
+    if (timer != null) {
+      logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: 
{}", timer.elapsed(TimeUnit.MILLISECONDS), statuses.size());
+      timer.stop();
+    }
 
     // fileSel will be null if we query an empty folder
     if (fileSel != null) {
@@ -259,7 +264,7 @@ public class FileSelection {
 
   public static FileSelection create(final DrillFileSystem fs, final String 
parent, final String path,
       final boolean allowAccessOutsideWorkspace) throws IOException {
-    Stopwatch timer = Stopwatch.createStarted();
+    Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
     boolean hasWildcard = path.contains(WILD_CARD);
 
     final Path combined = new Path(parent, removeLeadingSlash(path));
@@ -271,7 +276,10 @@ public class FileSelection {
       return null;
     }
     final FileSelection fileSel = create(Lists.newArrayList(statuses), null, 
combined.toUri().getPath());
-    logger.debug("FileSelection.create() took {} ms ", 
timer.elapsed(TimeUnit.MILLISECONDS));
+    if (timer != null) {
+      logger.debug("FileSelection.create() took {} ms ", 
timer.elapsed(TimeUnit.MILLISECONDS));
+      timer.stop();
+    }
     if (fileSel == null) {
       return null;
     }
@@ -322,7 +330,7 @@ public class FileSelection {
 
   public static FileSelection createFromDirectories(final List<String> 
dirPaths, final FileSelection selection,
       final String cacheFileRoot) {
-    Stopwatch timer = Stopwatch.createStarted();
+    Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
     final String root = selection.getSelectionRoot();
     if (Strings.isNullOrEmpty(root)) {
       throw new DrillRuntimeException("Selection root is null or empty" + 
root);
@@ -338,9 +346,7 @@ public class FileSelection {
         dirs.add(status.getPath().toString());
       }
     } else {
-      for (String s : dirPaths) {
-        dirs.add(s);
-      }
+      dirs.addAll(dirPaths);
     }
 
     final Path rootPath = handleWildCard(root);
@@ -349,7 +355,10 @@ public class FileSelection {
     final Path path = new Path(uri.getScheme(), uri.getAuthority(), 
rootPath.toUri().getPath());
     FileSelection fileSel = new FileSelection(null, dirs, path.toString(), 
cacheFileRoot, false);
     fileSel.setHadWildcard(selection.hadWildcard());
-    logger.info("FileSelection.createFromDirectories() took {} ms ", 
timer.elapsed(TimeUnit.MILLISECONDS));
+    if (timer != null) {
+      logger.debug("FileSelection.createFromDirectories() took {} ms ", 
timer.elapsed(TimeUnit.MILLISECONDS));
+      timer.stop();
+    }
     return fileSel;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
index e02d841..15107ac 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
@@ -22,14 +22,14 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork{
+public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork {
 
   private long start;
   private long length;
 
   @JsonCreator
   public ReadEntryFromHDFS(@JsonProperty("path") String 
path,@JsonProperty("start") long start, @JsonProperty("length") long length) {
-    this.path = path;
+    super(path);
     this.start = start;
     this.length = length;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 678569f..5af1091 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -147,10 +147,12 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
     List<RecordReader> readers = new LinkedList<>();
     List<Map<String, String>> implicitColumns = Lists.newArrayList();
     Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
-    for(FileWork work : scan.getWorkUnits()){
+    boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
+    for (FileWork work : scan.getWorkUnits()){
       RecordReader recordReader = getRecordReader(context, dfs, work, 
scan.getColumns(), scan.getUserName());
       readers.add(recordReader);
-      Map<String, String> implicitValues = 
columnExplorer.populateImplicitColumns(work, scan.getSelectionRoot());
+      List<String> partitionValues = 
ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot());
+      Map<String, String> implicitValues = 
columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, 
supportsFileImplicitColumns);
       implicitColumns.add(implicitValues);
       if (implicitValues.size() > mapWithMaxColumns.size()) {
         mapWithMaxColumns = implicitValues;

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
index f1da0f7..aa3f4ae 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
@@ -70,7 +70,7 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{
   }
 
   @JsonProperty("filter")
-  public InfoSchemaFilter getFilter() {
+  public InfoSchemaFilter getSchemaFilter() {
     return filter;
   }
 

Reply via email to