DRILL-6331: Revisit Hive Drill native parquet implementation to be exposed to 
Drill optimizations (filter / limit push down, count to direct scan)

1. Factored out common logic for Drill parquet reader and Hive Drill native 
parquet readers: AbstractParquetGroupScan, AbstractParquetRowGroupScan, 
AbstractParquetScanBatchCreator.
2. Rules that worked previously only with ParquetGroupScan, now can be applied 
for any class that extends AbstractParquetGroupScan: 
DrillFilterItemStarReWriterRule, ParquetPruneScanRule, PruneScanRule.
3. Hive populated partition values based on information returned from Hive 
metastore. Drill populates partition values based on path difference between 
selection root and actual file path.
   Before ColumnExplorer populated partition values based on Drill approach. 
Since now ColumnExplorer populates values for parquet files from Hive tables,
   `populateImplicitColumns` method logic was changed to populated partition 
columns only based on given partition values.
4. Refactored ParquetPartitionDescriptor to be responsible for populating 
partition values rather than storing this logic in parquet group scan class.
5. Metadata class was moved to separate metadata package 
(org.apache.drill.exec.store.parquet.metadata). Factored out several inner 
classed to improve code readability.
6. Collected all Drill native parquet reader unit tests into one class 
TestHiveDrillNativeParquetReader, also added new tests to cover new 
functionality.
7. Reduced excessive logging when parquet files metadata is read

closes #1214


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c6549e58
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c6549e58
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c6549e58

Branch: refs/heads/master
Commit: c6549e58859397c88cb1de61b4f6eee52a07ed0c
Parents: 84cd834
Author: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Authored: Tue Mar 20 18:29:45 2018 +0000
Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Committed: Fri Apr 27 11:41:22 2018 +0300

----------------------------------------------------------------------
 .../client/src/protobuf/UserBitShared.pb.cc     |   14 +-
 .../client/src/protobuf/UserBitShared.pb.h      |    5 +-
 ...onvertHiveParquetScanToDrillParquetScan.java |   57 +-
 .../HiveDrillNativeParquetRowGroupScan.java     |  131 ++
 .../store/hive/HiveDrillNativeParquetScan.java  |  204 +-
 .../HiveDrillNativeParquetScanBatchCreator.java |   79 +
 .../hive/HiveDrillNativeParquetSubScan.java     |   55 -
 .../hive/HiveDrillNativeScanBatchCreator.java   |  208 --
 .../exec/store/hive/HiveMetadataProvider.java   |    5 +-
 .../exec/store/hive/HivePartitionHolder.java    |   95 +
 .../exec/TestHiveDrillNativeParquetReader.java  |  248 +++
 .../drill/exec/TestHivePartitionPruning.java    |   19 -
 .../drill/exec/TestHiveProjectPushDown.java     |   13 -
 .../apache/drill/exec/hive/HiveTestBase.java    |    2 +-
 .../apache/drill/exec/hive/TestHiveStorage.java |  215 +-
 .../exec/hive/TestInfoSchemaOnHiveStorage.java  |    6 +-
 .../sql/hive/TestViewSupportOnHiveTables.java   |    2 +-
 .../exec/store/hive/HiveTestDataGenerator.java  |   82 +-
 .../kv_native_ext/part_key=1/kv_1.parquet       |  Bin 0 -> 220 bytes
 .../resources/external/part_key=2/kv_2.parquet  |  Bin 0 -> 220 bytes
 .../drill/exec/ops/BaseOperatorContext.java     |   42 +-
 .../exec/physical/base/AbstractGroupScan.java   |   14 +
 .../drill/exec/physical/base/GroupScan.java     |   10 +
 .../drill/exec/physical/base/ScanStats.java     |    9 +-
 .../planner/ParquetPartitionDescriptor.java     |  331 ++-
 .../apache/drill/exec/planner/PlannerPhase.java |    4 +
 .../DrillFilterItemStarReWriterRule.java        |    8 +-
 .../logical/partition/ParquetPruneScanRule.java |   16 +-
 .../logical/partition/PruneScanRule.java        |   77 +-
 .../sql/handlers/RefreshMetadataHandler.java    |    2 +-
 .../apache/drill/exec/store/ColumnExplorer.java |   90 +-
 .../apache/drill/exec/store/TimedRunnable.java  |   16 +-
 .../drill/exec/store/dfs/FileSelection.java     |   37 +-
 .../drill/exec/store/dfs/ReadEntryFromHDFS.java |    4 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |    6 +-
 .../exec/store/ischema/InfoSchemaGroupScan.java |    2 +-
 .../store/parquet/AbstractParquetGroupScan.java |  463 ++++
 .../parquet/AbstractParquetRowGroupScan.java    |   90 +
 .../AbstractParquetScanBatchCreator.java        |  186 ++
 .../drill/exec/store/parquet/Metadata.java      | 2007 ------------------
 .../exec/store/parquet/MetadataVersion.java     |  170 --
 .../exec/store/parquet/ParquetFormatPlugin.java |    3 +-
 .../exec/store/parquet/ParquetGroupScan.java    | 1236 ++---------
 .../parquet/ParquetGroupScanStatistics.java     |  215 ++
 .../store/parquet/ParquetPushDownFilter.java    |   23 +-
 .../store/parquet/ParquetReaderUtility.java     |  116 +-
 .../exec/store/parquet/ParquetRowGroupScan.java |  111 +-
 .../store/parquet/ParquetScanBatchCreator.java  |  160 +-
 .../drill/exec/store/parquet/RowGroupInfo.java  |   98 +
 .../exec/store/parquet/metadata/Metadata.java   |  694 ++++++
 .../store/parquet/metadata/MetadataBase.java    |  142 ++
 .../parquet/metadata/MetadataPathUtils.java     |  122 ++
 .../store/parquet/metadata/MetadataVersion.java |  169 ++
 .../store/parquet/metadata/Metadata_V1.java     |  329 +++
 .../store/parquet/metadata/Metadata_V2.java     |  418 ++++
 .../store/parquet/metadata/Metadata_V3.java     |  470 ++++
 .../metadata/ParquetTableMetadataDirs.java      |   49 +
 .../parquet/stat/ParquetMetaStatCollector.java  |   35 +-
 .../TestCorruptParquetDateCorrection.java       |    2 +-
 .../drill/exec/store/FormatPluginSerDeTest.java |   36 +-
 .../store/parquet/TestParquetMetadataCache.java |    2 +
 .../parquet/TestParquetMetadataVersion.java     |    1 +
 .../apache/drill/exec/proto/UserBitShared.java  |   22 +-
 .../exec/proto/beans/CoreOperatorType.java      |    4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |    1 +
 65 files changed, 5266 insertions(+), 4216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/native/client/src/protobuf/UserBitShared.pb.cc
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc 
b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index c5b2527..afab5af 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -749,7 +749,7 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
     "agmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALL"
     
"OCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\t"
     "CANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_"
-    "REQUESTED\020\006*\302\006\n\020CoreOperatorType\022\021\n\rSING"
+    "REQUESTED\020\006*\360\006\n\020CoreOperatorType\022\021\n\rSING"
     "LE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FIL"
     
"TER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004"
     "\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDE"
@@ -770,11 +770,12 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
     "P_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_"
     "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_S"
     
"CAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014LATERAL_JOIN\020)\022\n\n\006"
-    "UNNEST\020**g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000"
-    
"\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020"
-    "\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org"
-    ".apache.drill.exec.protoB\rUserBitSharedH"
-    "\001", 5121);
+    "UNNEST\020*\022,\n(HIVE_DRILL_NATIVE_PARQUET_RO"
+    "W_GROUP_SCAN\020+*g\n\nSaslStatus\022\020\n\014SASL_UNK"
+    "NOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRE"
+    
"SS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B"
+    ".\n\033org.apache.drill.exec.protoB\rUserBitS"
+    "haredH\001", 5167);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "UserBitShared.proto", &protobuf_RegisterTypes);
   UserCredentials::default_instance_ = new UserCredentials();
@@ -938,6 +939,7 @@ bool CoreOperatorType_IsValid(int value) {
     case 40:
     case 41:
     case 42:
+    case 43:
       return true;
     default:
       return false;

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/native/client/src/protobuf/UserBitShared.pb.h
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h 
b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index 1a69906..66c4cf7 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -246,11 +246,12 @@ enum CoreOperatorType {
   KUDU_SUB_SCAN = 39,
   FLATTEN = 40,
   LATERAL_JOIN = 41,
-  UNNEST = 42
+  UNNEST = 42,
+  HIVE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN = 43
 };
 bool CoreOperatorType_IsValid(int value);
 const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER;
-const CoreOperatorType CoreOperatorType_MAX = UNNEST;
+const CoreOperatorType CoreOperatorType_MAX = 
HIVE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN;
 const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1;
 
 const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor();

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index a732245..3484ab3 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveScan;
 import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
@@ -84,12 +85,12 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
    *    {@link MapredParquetInputFormat}
    * 4) No error occurred while checking for the above conditions. An error is 
logged as warning.
    *
-   * @param call
+   * @param call rule call
    * @return True if the rule can be applied. False otherwise
    */
   @Override
   public boolean matches(RelOptRuleCall call) {
-    final DrillScanRel scanRel = (DrillScanRel) call.rel(0);
+    final DrillScanRel scanRel = call.rel(0);
 
     if (!(scanRel.getGroupScan() instanceof HiveScan) || ((HiveScan) 
scanRel.getGroupScan()).isNativeReader()) {
       return false;
@@ -99,6 +100,10 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
     final HiveConf hiveConf = hiveScan.getHiveConf();
     final HiveTableWithColumnCache hiveTable = 
hiveScan.getHiveReadEntry().getTable();
 
+    if (containsUnsupportedDataTypes(hiveTable)) {
+      return false;
+    }
+
     final Class<? extends InputFormat<?,?>> tableInputFormat =
         getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), 
hiveScan.getHiveReadEntry(), hiveTable.getSd(),
             hiveConf);
@@ -139,9 +144,9 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
 
   /**
    * Get the input format from given {@link StorageDescriptor}
-   * @param properties
-   * @param hiveReadEntry
-   * @param sd
+   * @param properties table properties
+   * @param hiveReadEntry hive read entry
+   * @param sd storage descriptor
    * @return {@link InputFormat} class or null if a failure has occurred. 
Failure is logged as warning.
    */
   private Class<? extends InputFormat<?, ?>> getInputFormatFromSD(final 
Properties properties,
@@ -166,25 +171,41 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
   @Override
   public void onMatch(RelOptRuleCall call) {
     try {
-      final DrillScanRel hiveScanRel = (DrillScanRel) call.rel(0);
+      final DrillScanRel hiveScanRel = call.rel(0);
       final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
 
       final PlannerSettings settings = 
PrelUtil.getPlannerSettings(call.getPlanner());
       final String partitionColumnLabel = settings.getFsPartitionColumnLabel();
 
       final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
-      checkForUnsupportedDataTypes(hiveTable);
+      final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
 
-      final Map<String, String> partitionColMapping =
-          getPartitionColMapping(hiveTable, partitionColumnLabel);
+      final HiveMetadataProvider hiveMetadataProvider = new 
HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, 
hiveScan.getStoragePlugin().getHiveConf());
+      final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = 
hiveMetadataProvider.getInputSplits(hiveReadEntry);
+
+      if (logicalInputSplits.isEmpty()) {
+        // table is empty, use original scan
+        return;
+      }
 
-      final DrillScanRel nativeScanRel = 
createNativeScanRel(partitionColMapping, hiveScanRel);
+      final Map<String, String> partitionColMapping = 
getPartitionColMapping(hiveTable, partitionColumnLabel);
+      final DrillScanRel nativeScanRel = 
createNativeScanRel(partitionColMapping, hiveScanRel, logicalInputSplits);
       if (hiveScanRel.getRowType().getFieldCount() == 0) {
         call.transformTo(nativeScanRel);
       } else {
         final DrillProjectRel projectRel = createProjectRel(hiveScanRel, 
partitionColMapping, nativeScanRel);
         call.transformTo(projectRel);
       }
+
+
+      /*
+        Drill native scan should take precedence over Hive since it's more 
efficient and faster.
+        Hive does not always give correct costing (i.e. for external tables 
Hive does not have number of rows
+        and we calculate them approximately). On the contrary, Drill 
calculates number of rows exactly
+        and thus Hive Scan can be chosen instead of Drill native scan because 
costings allegedly lower for Hive.
+        To ensure Drill native scan we'll be chosen, reduce Hive scan 
importance to 0.
+       */
+      call.getPlanner().setImportance(hiveScanRel, 0.0);
     } catch (final Exception e) {
       logger.warn("Failed to convert HiveScan to HiveDrillNativeParquetScan", 
e);
     }
@@ -208,7 +229,8 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
    * Helper method which creates a DrillScalRel with native HiveScan.
    */
   private DrillScanRel createNativeScanRel(final Map<String, String> 
partitionColMapping,
-      final DrillScanRel hiveScanRel) throws Exception{
+                                           final DrillScanRel hiveScanRel,
+                                           final 
List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits) throws 
Exception {
 
     final RelDataTypeFactory typeFactory = 
hiveScanRel.getCluster().getTypeFactory();
     final RelDataType varCharType = 
typeFactory.createSqlType(SqlTypeName.VARCHAR);
@@ -245,10 +267,9 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
     final HiveDrillNativeParquetScan nativeHiveScan =
         new HiveDrillNativeParquetScan(
             hiveScan.getUserName(),
-            hiveScan.getHiveReadEntry(),
-            hiveScan.getStoragePlugin(),
             nativeScanCols,
-            null);
+            hiveScan.getStoragePlugin(),
+            logicalInputSplits);
 
     return new DrillScanRel(
         hiveScanRel.getCluster(),
@@ -321,15 +342,17 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
     return rb.makeCast(outputType, inputRef);
   }
 
-  private void checkForUnsupportedDataTypes(final Table hiveTable) {
-    for(FieldSchema hiveField : hiveTable.getSd().getCols()) {
+  private boolean containsUnsupportedDataTypes(final Table hiveTable) {
+    for (FieldSchema hiveField : hiveTable.getSd().getCols()) {
       final Category category = 
TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getType()).getCategory();
       if (category == Category.MAP ||
           category == Category.STRUCT ||
           category == Category.UNION ||
           category == Category.LIST) {
-        HiveUtilities.throwUnsupportedHiveDataTypeError(category.toString());
+        logger.debug("Hive table contains unsupported data type: {}", 
category);
+        return true;
       }
     }
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
new file mode 100644
index 0000000..e227015
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.List;
+
+@JsonTypeName("hive-drill-native-parquet-row-group-scan")
+public class HiveDrillNativeParquetRowGroupScan extends 
AbstractParquetRowGroupScan {
+
+  private final HiveStoragePlugin hiveStoragePlugin;
+  private final HiveStoragePluginConfig hiveStoragePluginConfig;
+  private final HivePartitionHolder hivePartitionHolder;
+
+  @JsonCreator
+  public HiveDrillNativeParquetRowGroupScan(@JacksonInject 
StoragePluginRegistry registry,
+                                            @JsonProperty("userName") String 
userName,
+                                            
@JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig 
hiveStoragePluginConfig,
+                                            
@JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry> 
rowGroupReadEntries,
+                                            @JsonProperty("columns") 
List<SchemaPath> columns,
+                                            
@JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+                                            @JsonProperty("filter") 
LogicalExpression filter) throws ExecutionSetupException {
+    this(userName,
+        (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
+        rowGroupReadEntries,
+        columns,
+        hivePartitionHolder,
+        filter);
+  }
+
+  public HiveDrillNativeParquetRowGroupScan(String userName,
+                                            HiveStoragePlugin 
hiveStoragePlugin,
+                                            List<RowGroupReadEntry> 
rowGroupReadEntries,
+                                            List<SchemaPath> columns,
+                                            HivePartitionHolder 
hivePartitionHolder,
+                                            LogicalExpression filter) {
+    super(userName, rowGroupReadEntries, columns, filter);
+    this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, 
"Could not find format config for the given configuration");
+    this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
+    this.hivePartitionHolder = hivePartitionHolder;
+  }
+
+  @JsonProperty
+  public HiveStoragePluginConfig getHiveStoragePluginConfig() {
+    return hiveStoragePluginConfig;
+  }
+
+  @JsonProperty
+  public HivePartitionHolder getHivePartitionHolder() {
+    return hivePartitionHolder;
+  }
+
+  @JsonIgnore
+  public HiveStoragePlugin getHiveStoragePlugin() {
+    return hiveStoragePlugin;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.HIVE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN_VALUE;
+  }
+
+  @Override
+  public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+  }
+
+  @Override
+  public boolean areCorruptDatesAutoCorrected() {
+    return true;
+  }
+
+  @Override
+  public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws 
IOException {
+    Path path = new Path(rowGroupReadEntry.getPath()).getParent();
+    return new ProjectionPusher().pushProjectionsAndFilters(
+        new JobConf(hiveStoragePlugin.getHiveConf()),
+        path.getParent());
+  }
+
+  @Override
+  public boolean supportsFileImplicitColumns() {
+    return false;
+  }
+
+  @Override
+  public List<String> getPartitionValues(RowGroupReadEntry rowGroupReadEntry) {
+    return hivePartitionHolder.get(rowGroupReadEntry.getPath());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 202bd43..03a80d3 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -21,94 +21,204 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider.LogicalInputSplit;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.store.parquet.RowGroupInfo;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
-/**
- * Extension of {@link HiveScan} which support reading Hive tables using 
Drill's native parquet reader.
- */
 @JsonTypeName("hive-drill-native-parquet-scan")
-public class HiveDrillNativeParquetScan extends HiveScan {
+public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
+
+  private final HiveStoragePlugin hiveStoragePlugin;
+  private HivePartitionHolder hivePartitionHolder;
 
   @JsonCreator
-  public HiveDrillNativeParquetScan(@JsonProperty("userName") String userName,
-                                    @JsonProperty("hiveReadEntry") 
HiveReadEntry hiveReadEntry,
+  public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry 
engineRegistry,
+                                    @JsonProperty("userName") String userName,
                                     @JsonProperty("hiveStoragePluginConfig") 
HiveStoragePluginConfig hiveStoragePluginConfig,
                                     @JsonProperty("columns") List<SchemaPath> 
columns,
-                                    @JacksonInject StoragePluginRegistry 
pluginRegistry) throws ExecutionSetupException {
-    super(userName, hiveReadEntry, hiveStoragePluginConfig, columns, 
pluginRegistry);
+                                    @JsonProperty("entries") 
List<ReadEntryWithPath> entries,
+                                    @JsonProperty("hivePartitionHolder") 
HivePartitionHolder hivePartitionHolder,
+                                    @JsonProperty("filter") LogicalExpression 
filter) throws IOException, ExecutionSetupException {
+    super(ImpersonationUtil.resolveUserName(userName), columns, entries, 
filter);
+    this.hiveStoragePlugin = (HiveStoragePlugin) 
engineRegistry.getPlugin(hiveStoragePluginConfig);
+    this.hivePartitionHolder = hivePartitionHolder;
+
+    init();
   }
 
-  public HiveDrillNativeParquetScan(String userName, HiveReadEntry 
hiveReadEntry, HiveStoragePlugin hiveStoragePlugin,
-      List<SchemaPath> columns, HiveMetadataProvider metadataProvider) throws 
ExecutionSetupException {
-    super(userName, hiveReadEntry, hiveStoragePlugin, columns, 
metadataProvider);
+  public HiveDrillNativeParquetScan(String userName,
+                                    List<SchemaPath> columns,
+                                    HiveStoragePlugin hiveStoragePlugin,
+                                    List<LogicalInputSplit> 
logicalInputSplits) throws IOException {
+    this(userName, columns, hiveStoragePlugin, logicalInputSplits, 
ValueExpressions.BooleanExpression.TRUE);
   }
 
-  public HiveDrillNativeParquetScan(final HiveScan hiveScan) {
-    super(hiveScan);
+  public HiveDrillNativeParquetScan(String userName,
+                                    List<SchemaPath> columns,
+                                    HiveStoragePlugin hiveStoragePlugin,
+                                    List<LogicalInputSplit> logicalInputSplits,
+                                    LogicalExpression filter) throws 
IOException {
+    super(userName, columns, new ArrayList<>(), filter);
+
+    this.hiveStoragePlugin = hiveStoragePlugin;
+    this.hivePartitionHolder = new HivePartitionHolder();
+
+    for (LogicalInputSplit logicalInputSplit : logicalInputSplits) {
+      Iterator<InputSplit> iterator = 
logicalInputSplit.getInputSplits().iterator();
+      // logical input split contains list of splits by files
+      // we need to read path of only one to get file path
+      assert iterator.hasNext();
+      InputSplit split = iterator.next();
+      assert split instanceof FileSplit;
+      FileSplit fileSplit = (FileSplit) split;
+      Path finalPath = fileSplit.getPath();
+      String pathString = 
Path.getPathWithoutSchemeAndAuthority(finalPath).toString();
+      entries.add(new ReadEntryWithPath(pathString));
+
+      // store partition values per path
+      Partition partition = logicalInputSplit.getPartition();
+      if (partition != null) {
+        hivePartitionHolder.add(pathString, partition.getValues());
+      }
+    }
+
+    init();
   }
 
-  @Override
-  public ScanStats getScanStats() {
-    final ScanStats nativeHiveScanStats = super.getScanStats();
+  private HiveDrillNativeParquetScan(HiveDrillNativeParquetScan that) {
+    super(that);
+    this.hiveStoragePlugin = that.hiveStoragePlugin;
+    this.hivePartitionHolder = that.hivePartitionHolder;
+  }
 
-    // As Drill's native parquet record reader is faster and memory efficient. 
Divide the CPU cost
-    // by a factor to let the planner choose HiveDrillNativeScan over HiveScan 
with SerDes.
-    return new ScanStats(
-        nativeHiveScanStats.getGroupScanProperty(),
-        nativeHiveScanStats.getRecordCount(),
-        nativeHiveScanStats.getCpuCost()/getSerDeOverheadFactor(),
-        nativeHiveScanStats.getDiskCost());
+  @JsonProperty
+  public HiveStoragePluginConfig getHiveStoragePluginConfig() {
+    return hiveStoragePlugin.getConfig();
   }
 
-  @Override
-  public SubScan getSpecificScan(int minorFragmentId) throws 
ExecutionSetupException {
-    try {
-      return new 
HiveDrillNativeParquetSubScan((HiveSubScan)super.getSpecificScan(minorFragmentId));
-    } catch (IOException | ReflectiveOperationException e) {
-      throw new ExecutionSetupException(e);
-    }
+  @JsonProperty
+  public HivePartitionHolder getHivePartitionHolder() {
+    return hivePartitionHolder;
   }
 
   @Override
-  public boolean isNativeReader() {
-    return true;
+  public SubScan getSpecificScan(int minorFragmentId) {
+    List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
+    HivePartitionHolder subPartitionHolder = new HivePartitionHolder();
+    for (RowGroupReadEntry readEntry : readEntries) {
+      List<String> values = hivePartitionHolder.get(readEntry.getPath());
+      subPartitionHolder.add(readEntry.getPath(), values);
+    }
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, readEntries, columns, subPartitionHolder, filter);
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
     return new HiveDrillNativeParquetScan(this);
   }
 
   @Override
-  public HiveScan clone(HiveReadEntry hiveReadEntry) throws 
ExecutionSetupException {
-    return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, 
getStoragePlugin(), getColumns(), getMetadataProvider());
+  public HiveDrillNativeParquetScan clone(FileSelection selection) throws 
IOException {
+    HiveDrillNativeParquetScan newScan = new HiveDrillNativeParquetScan(this);
+    newScan.modifyFileSelection(selection);
+    newScan.init();
+    return newScan;
   }
 
   @Override
   public GroupScan clone(List<SchemaPath> columns) {
-    final HiveDrillNativeParquetScan scan = new 
HiveDrillNativeParquetScan(this);
-    scan.columns = columns;
-    return scan;
+    HiveDrillNativeParquetScan newScan = new HiveDrillNativeParquetScan(this);
+    newScan.columns = columns;
+    return newScan;
   }
 
   @Override
   public String toString() {
-    final List<HivePartitionWrapper> partitions = 
getHiveReadEntry().getHivePartitionWrappers();
-    int numPartitions = partitions == null ? 0 : partitions.size();
-    return "HiveDrillNativeParquetScan [table=" + 
getHiveReadEntry().getHiveTableWrapper()
-        + ", columns=" + getColumns()
-        + ", numPartitions=" + numPartitions
-        + ", partitions= " + partitions
-        + ", inputDirectories=" + 
getMetadataProvider().getInputDirectories(getHiveReadEntry()) + "]";
+    StringBuilder builder = new StringBuilder();
+    builder.append("HiveDrillNativeParquetScan [");
+    builder.append("entries=").append(entries);
+    builder.append(", numFiles=").append(getEntries().size());
+    builder.append(", numRowGroups=").append(rowGroupInfos.size());
+
+    String filterString = getFilterString();
+    if (!filterString.isEmpty()) {
+      builder.append(", filter=").append(filterString);
+    }
+
+    builder.append(", columns=").append(columns);
+    builder.append("]");
+
+    return builder.toString();
   }
+
+  @Override
+  protected void initInternal() throws IOException {
+    ParquetFormatConfig formatConfig = new ParquetFormatConfig();
+    Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>();
+    for (ReadEntryWithPath entry : entries) {
+      Path path = new Path(entry.getPath());
+      Configuration conf = new ProjectionPusher().pushProjectionsAndFilters(
+          new JobConf(hiveStoragePlugin.getHiveConf()),
+          path.getParent());
+      FileSystem fs = path.getFileSystem(conf);
+      
fileStatusConfMap.put(fs.getFileStatus(Path.getPathWithoutSchemeAndAuthority(path)),
 fs);
+    }
+    parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap, 
formatConfig);
+  }
+
+  @Override
+  protected Collection<CoordinationProtos.DrillbitEndpoint> getDrillbits() {
+    return hiveStoragePlugin.getContext().getBits();
+  }
+
+  @Override
+  protected AbstractParquetGroupScan cloneWithFileSelection(Collection<String> 
filePaths) throws IOException {
+    FileSelection newSelection = new FileSelection(null, new 
ArrayList<>(filePaths), null, null, false);
+    return clone(newSelection);
+  }
+
+  @Override
+  protected boolean supportsFileImplicitColumns() {
+    return false;
+  }
+
+  @Override
+  protected List<String> getPartitionValues(RowGroupInfo rowGroupInfo) {
+    return hivePartitionHolder.get(rowGroupInfo.getPath());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
new file mode 100644
index 0000000..669369b
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HiveDrillNativeParquetScanBatchCreator extends 
AbstractParquetScanBatchCreator implements 
BatchCreator<HiveDrillNativeParquetRowGroupScan> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, 
HiveDrillNativeParquetRowGroupScan rowGroupScan, List<RecordBatch> children) 
throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    OperatorContext oContext = context.newOperatorContext(rowGroupScan);
+    return getBatch(context, rowGroupScan, oContext);
+  }
+
+  @Override
+  protected AbstractDrillFileSystemManager 
getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager 
optionManager) {
+    return new HiveDrillNativeParquetDrillFileSystemManager(operatorContext);
+  }
+
+  /**
+   * Caches file system per path and returns file system from cache if it 
exists there.
+   * Creates only non-tracking file systems.
+   */
+  private class HiveDrillNativeParquetDrillFileSystemManager extends 
AbstractDrillFileSystemManager {
+
+    private final Map<String, DrillFileSystem> fileSystems;
+
+    HiveDrillNativeParquetDrillFileSystemManager(OperatorContext 
operatorContext) {
+      super(operatorContext);
+      this.fileSystems = new HashMap<>();
+    }
+
+    @Override
+    protected DrillFileSystem get(Configuration config, String path) throws 
ExecutionSetupException {
+      DrillFileSystem fs = fileSystems.get(path);
+      if (fs == null) {
+        try {
+          fs = operatorContext.newNonTrackingFileSystem(config);
+        } catch (IOException e) {
+          throw new ExecutionSetupException(String.format("Failed to create 
non-tracking DrillFileSystem: %s", e.getMessage()), e);
+        }
+        fileSystems.put(path, fs);
+      }
+      return fs;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
deleted file mode 100644
index 2129ed4..0000000
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Extension of {@link HiveSubScan} which support reading Hive tables using 
Drill's native parquet reader.
- */
-@JsonTypeName("hive-drill-native-parquet-sub-scan")
-public class HiveDrillNativeParquetSubScan extends HiveSubScan {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveDrillNativeParquetSubScan.class);
-
-  @JsonCreator
-  public HiveDrillNativeParquetSubScan(@JacksonInject StoragePluginRegistry 
registry,
-                                       @JsonProperty("userName") String 
userName,
-                                       @JsonProperty("splits") 
List<List<String>> splits,
-                                       @JsonProperty("hiveReadEntry") 
HiveReadEntry hiveReadEntry,
-                                       @JsonProperty("splitClasses") 
List<String> splitClasses,
-                                       @JsonProperty("columns") 
List<SchemaPath> columns,
-                                       
@JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig 
hiveStoragePluginConfig)
-      throws IOException, ExecutionSetupException, 
ReflectiveOperationException {
-    super(registry, userName, splits, hiveReadEntry, splitClasses, columns, 
hiveStoragePluginConfig);
-  }
-
-  public HiveDrillNativeParquetSubScan(final HiveSubScan subScan)
-      throws IOException, ExecutionSetupException, 
ReflectiveOperationException {
-    super(subScan.getUserName(), subScan.getSplits(), 
subScan.getHiveReadEntry(), subScan.getSplitClasses(),
-        subScan.getColumns(), subScan.getStoragePlugin());
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
deleted file mode 100644
index 43318d1..0000000
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Functions;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.hive.readers.HiveDefaultReader;
-import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
-import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
-import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.util.Utilities;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-@SuppressWarnings("unused")
-public class HiveDrillNativeScanBatchCreator implements 
BatchCreator<HiveDrillNativeParquetSubScan> {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveDrillNativeScanBatchCreator.class);
-
-  @Override
-  public ScanBatch getBatch(ExecutorFragmentContext context, 
HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
-      throws ExecutionSetupException {
-    final HiveTableWithColumnCache table = config.getTable();
-    final List<List<InputSplit>> splits = config.getInputSplits();
-    final List<HivePartition> partitions = config.getPartitions();
-    final List<SchemaPath> columns = config.getColumns();
-    final String partitionDesignator = context.getOptions()
-        .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
-    List<Map<String, String>> implicitColumns = Lists.newLinkedList();
-    boolean selectAllQuery = Utilities.isStarQuery(columns);
-
-    final boolean hasPartitions = (partitions != null && partitions.size() > 
0);
-
-    final List<String[]> partitionColumns = Lists.newArrayList();
-    final List<Integer> selectedPartitionColumns = Lists.newArrayList();
-    List<SchemaPath> tableColumns = columns;
-    if (!selectAllQuery) {
-      // Separate out the partition and non-partition columns. Non-partition 
columns are passed directly to the
-      // ParquetRecordReader. Partition columns are passed to ScanBatch.
-      tableColumns = Lists.newArrayList();
-      Pattern pattern = Pattern.compile(String.format("%s[0-9]+", 
partitionDesignator));
-      for (SchemaPath column : columns) {
-        Matcher m = pattern.matcher(column.getRootSegmentPath());
-        if (m.matches()) {
-          selectedPartitionColumns.add(
-              
Integer.parseInt(column.getRootSegmentPath().substring(partitionDesignator.length())));
-        } else {
-          tableColumns.add(column);
-        }
-      }
-    }
-
-    final OperatorContext oContext = context.newOperatorContext(config);
-
-    int currentPartitionIndex = 0;
-    final List<RecordReader> readers = new LinkedList<>();
-
-    final HiveConf conf = config.getHiveConf();
-
-    // TODO: In future we can get this cache from Metadata cached on 
filesystem.
-    final Map<String, ParquetMetadata> footerCache = Maps.newHashMap();
-
-    Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
-    try {
-      for (List<InputSplit> splitGroups : splits) {
-        for (InputSplit split : splitGroups) {
-          final FileSplit fileSplit = (FileSplit) split;
-          final Path finalPath = fileSplit.getPath();
-          final JobConf cloneJob =
-              new ProjectionPusher().pushProjectionsAndFilters(new 
JobConf(conf), finalPath.getParent());
-          final FileSystem fs = finalPath.getFileSystem(cloneJob);
-
-          ParquetMetadata parquetMetadata = 
footerCache.get(finalPath.toString());
-          if (parquetMetadata == null) {
-            parquetMetadata = ParquetFileReader.readFooter(cloneJob, 
finalPath);
-            footerCache.put(finalPath.toString(), parquetMetadata);
-          }
-          final List<Integer> rowGroupNums = 
getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata);
-
-          for (int rowGroupNum : rowGroupNums) {
-            //DRILL-5009 : Skip the row group if the row count is zero
-            if (parquetMetadata.getBlocks().get(rowGroupNum).getRowCount() == 
0) {
-              continue;
-            }
-            // Drill has only ever written a single row group per file, only 
detect corruption
-            // in the first row group
-            ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
-                ParquetReaderUtility.detectCorruptDates(parquetMetadata, 
config.getColumns(), true);
-            if (logger.isDebugEnabled()) {
-              logger.debug(containsCorruptDates.toString());
-            }
-            readers.add(new ParquetRecordReader(
-                context,
-                Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),
-                rowGroupNum, fs,
-                CodecFactory.createDirectCodecFactory(fs.getConf(),
-                    new 
ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
-                parquetMetadata,
-                tableColumns,
-                containsCorruptDates)
-            );
-            Map<String, String> implicitValues = Maps.newLinkedHashMap();
-
-            if (hasPartitions) {
-              List<String> values = 
partitions.get(currentPartitionIndex).getValues();
-              for (int i = 0; i < values.size(); i++) {
-                if (selectAllQuery || selectedPartitionColumns.contains(i)) {
-                  implicitValues.put(partitionDesignator + i, values.get(i));
-                }
-              }
-            }
-            implicitColumns.add(implicitValues);
-            if (implicitValues.size() > mapWithMaxColumns.size()) {
-              mapWithMaxColumns = implicitValues;
-            }
-          }
-          currentPartitionIndex++;
-        }
-      }
-    } catch (final IOException|RuntimeException e) {
-      AutoCloseables.close(e, readers);
-      throw new ExecutionSetupException("Failed to create RecordReaders. " + 
e.getMessage(), e);
-    }
-
-    // all readers should have the same number of implicit columns, add 
missing ones with value null
-    mapWithMaxColumns = Maps.transformValues(mapWithMaxColumns, 
Functions.constant((String) null));
-    for (Map<String, String> map : implicitColumns) {
-      map.putAll(Maps.difference(map, mapWithMaxColumns).entriesOnlyOnRight());
-    }
-
-    // If there are no readers created (which is possible when the table is 
empty or no row groups are matched),
-    // create an empty RecordReader to output the schema
-    if (readers.size() == 0) {
-      readers.add(new HiveDefaultReader(table, null, null, tableColumns, 
context, conf,
-        ImpersonationUtil.createProxyUgi(config.getUserName(), 
context.getQueryUserName())));
-    }
-
-    return new ScanBatch(context, oContext, readers, implicitColumns);
-  }
-
-  /**
-   * Get the list of row group numbers for given file input split. Logic used 
here is same as how Hive's parquet input
-   * format finds the row group numbers for input split.
-   */
-  private List<Integer> getRowGroupNumbersFromFileSplit(final FileSplit split,
-      final ParquetMetadata footer) throws IOException {
-    final List<BlockMetaData> blocks = footer.getBlocks();
-
-    final long splitStart = split.getStart();
-    final long splitLength = split.getLength();
-
-    final List<Integer> rowGroupNums = Lists.newArrayList();
-
-    int i = 0;
-    for (final BlockMetaData block : blocks) {
-      final long firstDataPage = 
block.getColumns().get(0).getFirstDataPageOffset();
-      if (firstDataPage >= splitStart && firstDataPage < splitStart + 
splitLength) {
-        rowGroupNums.add(i);
-      }
-      i++;
-    }
-
-    return rowGroupNums;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index b11ef3b..c877564 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -243,7 +243,10 @@ public class HiveMetadataProvider {
       data += split.getLength();
     }
 
-    return new HiveStats(data/RECORD_SIZE, data);
+    long numRows = data / RECORD_SIZE;
+    // if the result of division is zero and data size > 0, estimate to one row
+    numRows = numRows == 0 && data > 0 ? 1 : numRows;
+    return new HiveStats(numRows, data);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
new file mode 100644
index 0000000..803144e
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class that stores partition values per key.
+ * Key to index mapper contains key and index corresponding to partition 
values position in partition values list.
+ * Since several keys may have that same partition values, such structure is 
optimized to save memory usage.
+ * Partition values are stored in list of consecutive values.
+ */
+public class HivePartitionHolder {
+
+  private final Map<String, Integer> keyToIndexMapper;
+  private final List<List<String>> partitionValues;
+
+  @JsonCreator
+  public HivePartitionHolder(@JsonProperty("keyToIndexMapper") Map<String, 
Integer> keyToIndexMapper,
+                             @JsonProperty("partitionValues") 
List<List<String>> partitionValues) {
+    this.keyToIndexMapper = keyToIndexMapper;
+    this.partitionValues = partitionValues;
+  }
+
+  public HivePartitionHolder() {
+    this.keyToIndexMapper = new HashMap<>();
+    this.partitionValues = new ArrayList<>();
+  }
+
+  @JsonProperty
+  public Map<String, Integer> getKeyToIndexMapper() {
+    return keyToIndexMapper;
+  }
+
+  @JsonProperty
+  public List<List<String>> getPartitionValues() {
+    return partitionValues;
+  }
+
+  /**
+   * Checks if partition values already exist in holder.
+   * If not, adds them to holder and adds key and index corresponding to 
partition values to mapper.
+   * If partition values exist, adds key and existing partition values index 
to mapper.
+   *
+   * @param key mapper key
+   * @param values partition values
+   */
+  public void add(String key, List<String> values) {
+    int index = partitionValues.indexOf(values);
+    if (index == -1) {
+      index = partitionValues.size();
+      partitionValues.add(values);
+
+    }
+    keyToIndexMapper.put(key, index);
+  }
+
+  /**
+   * Returns list of partition values stored in holder for the given key.
+   * If there are no corresponding partition values, return empty list.
+   *
+   * @param key mapper key
+   * @return list of partition values
+   */
+  public List<String> get(String key) {
+    Integer index = keyToIndexMapper.get(key);
+    if (index == null) {
+      return Collections.emptyList();
+    }
+    return partitionValues.get(index);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
new file mode 100644
index 0000000..23c67b5
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.hive.HiveTestBase;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.hamcrest.CoreMatchers;
+import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+
+@Category({SlowTest.class, HiveStorageTest.class})
+public class TestHiveDrillNativeParquetReader extends HiveTestBase {
+
+  @BeforeClass
+  public static void init() {
+    setSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS, 
true);
+    setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    resetSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS);
+    resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+  }
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testFilterPushDownForManagedTable() throws Exception {
+    String query = "select * from hive.kv_native where key > 1";
+
+    int actualRowCount = testSql(query);
+    assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
+
+    testPlanMatchingPatterns(query,
+        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+  }
+
+  @Test
+  public void testFilterPushDownForExternalTable() throws Exception {
+    String query = "select * from hive.kv_native_ext where key = 1";
+
+    int actualRowCount = testSql(query);
+    assertEquals("Expected and actual row count should match", 1, 
actualRowCount);
+
+    testPlanMatchingPatterns(query,
+        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+  }
+
+  @Test
+  public void testManagedPartitionPruning() throws Exception {
+    String query = "select * from hive.readtest_parquet where tinyint_part = 
64";
+
+    int actualRowCount = testSql(query);
+    assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
+
+    // Hive partition pruning is applied during logical stage
+    // while convert to Drill native parquet reader during physical
+    // thus plan should not contain filter
+    testPlanMatchingPatterns(query,
+        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new 
String[]{"Filter"});
+  }
+
+  @Test
+  public void testExternalPartitionPruning() throws Exception {
+    String query = "select `key` from hive.kv_native_ext where part_key = 2";
+
+    int actualRowCount = testSql(query);
+    assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
+
+    // Hive partition pruning is applied during logical stage
+    // while convert to Drill native parquet reader during physical
+    // thus plan should not contain filter
+    testPlanMatchingPatterns(query,
+        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new 
String[]{"Filter"});
+  }
+
+  @Test
+  public void testSimpleStarSubQueryFilterPushDown() throws Exception {
+    String query = "select * from (select * from (select * from 
hive.kv_native)) where key > 1";
+
+    int actualRowCount = testSql(query);
+    assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
+
+    testPlanMatchingPatterns(query,
+        new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+  }
+
+  @Test
+  public void testPartitionedExternalTable() throws Exception {
+    String query = "select * from hive.kv_native_ext";
+
+    testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", 
"numFiles=2"}, null);
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("key", "part_key")
+        .baselineValues(1, 1)
+        .baselineValues(2, 1)
+        .baselineValues(3, 2)
+        .baselineValues(4, 2)
+        .go();
+  }
+
+  @Test
+  public void testEmptyTable() throws Exception {
+    String query = "select * from hive.empty_table";
+    // Hive reader should be chosen to output the schema
+    testPlanMatchingPatterns(query, new String[]{"HiveScan"}, new 
String[]{"HiveDrillNativeParquetScan"});
+  }
+
+  @Test
+  public void testEmptyPartition() throws Exception {
+    String query = "select * from hive.kv_native_ext where part_key = 3";
+    // Hive reader should be chosen to output the schema
+    testPlanMatchingPatterns(query, new String[]{"HiveScan"}, new 
String[]{"HiveDrillNativeParquetScan"});
+  }
+
+  @Test
+  public void testPhysicalPlanSubmission() throws Exception {
+    // checks only group scan
+    PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.kv_native");
+    PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.kv_native_ext");
+  }
+
+  @Test
+  public void testProjectPushDownOptimization() throws Exception {
+    String query = "select boolean_field, int_part from hive.readtest_parquet";
+
+    int actualRowCount = testSql(query);
+    assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
+
+    testPlanMatchingPatterns(query,
+        // partition column is named during scan as Drill partition columns
+        // it will be renamed to actual value in subsequent project
+        new String[]{"Project\\(boolean_field=\\[\\$0\\], 
int_part=\\[CAST\\(\\$1\\):INTEGER\\]\\)",
+            "HiveDrillNativeParquetScan",
+            "columns=\\[`boolean_field`, `dir9`\\]"},
+        new String[]{});
+  }
+
+  @Test
+  public void testLimitPushDownOptimization() throws Exception {
+    String query = "select * from hive.kv_native limit 2";
+
+    int actualRowCount = testSql(query);
+    assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
+
+    testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", 
"numFiles=1"}, null);
+  }
+
+  @Test
+  public void testConvertCountToDirectScanOptimization() throws Exception {
+    String query = "select count(1) as cnt from hive.kv_native";
+
+    testPlanMatchingPatterns(query, new String[]{"DynamicPojoRecordReader"}, 
null);
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("cnt")
+      .baselineValues(8L)
+      .go();
+  }
+
+  @Test
+  public void testImplicitColumns() throws Exception {
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage(CoreMatchers.allOf(containsString("VALIDATION 
ERROR"), containsString("not found in any table")));
+
+    test("select *, filename, fqn, filepath, suffix from hive.kv_native");
+  }
+
+  @Test // DRILL-3739
+  public void testReadingFromStorageHandleBasedTable() throws Exception {
+    testBuilder()
+        .sqlQuery("select * from hive.kv_sh order by key limit 2")
+        .ordered()
+        .baselineColumns("key", "value")
+        .expectsEmptyResultSet()
+        .go();
+  }
+
+  @Test
+  public void testReadAllSupportedHiveDataTypesNativeParquet() throws 
Exception {
+    String query = "select * from hive.readtest_parquet";
+
+    testPlanMatchingPatterns(query, new String[] 
{"HiveDrillNativeParquetScan"}, null);
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("binary_field", "boolean_field", "tinyint_field", 
"decimal0_field", "decimal9_field", "decimal18_field", "decimal28_field", 
"decimal38_field", "double_field", "float_field", "int_field", "bigint_field", 
"smallint_field", "string_field", "varchar_field", "timestamp_field", 
"char_field",
+        // There is a regression in Hive 1.2.1 in binary and boolean partition 
columns. Disable for now.
+        //"binary_part",
+        "boolean_part", "tinyint_part", "decimal0_part", "decimal9_part", 
"decimal18_part", "decimal28_part", "decimal38_part", "double_part", 
"float_part", "int_part", "bigint_part", "smallint_part", "string_part", 
"varchar_part", "timestamp_part", "date_part", "char_part")
+        .baselineValues("binaryfield".getBytes(), false, 34, new 
BigDecimal("66"), new BigDecimal("2347.92"), new 
BigDecimal("2758725827.99990"), new BigDecimal("29375892739852.8"), new 
BigDecimal("89853749534593985.783"), 8.345d, 4.67f, 123456, 234235L, 3455, 
"stringfield", "varcharfield", new DateTime(Timestamp.valueOf("2013-07-05 
17:01:00").getTime()), "charfield",
+        // There is a regression in Hive 1.2.1 in binary and boolean partition 
columns. Disable for now.
+        //"binary",
+        true, 64, new BigDecimal("37"), new BigDecimal("36.90"), new 
BigDecimal("3289379872.94565"), new BigDecimal("39579334534534.4"), new 
BigDecimal("363945093845093890.900"), 8.345d, 4.67f, 123456, 234235L, 3455, 
"string", "varchar", new DateTime(Timestamp.valueOf("2013-07-05 
17:01:00").getTime()), new DateTime(Date.valueOf("2013-07-05").getTime()), 
"char").baselineValues( // All fields are null, but partition fields have 
non-null values
+        null, null, null, null, null, null, null, null, null, null, null, 
null, null, null, null, null, null,
+        // There is a regression in Hive 1.2.1 in binary and boolean partition 
columns. Disable for now.
+        //"binary",
+        true, 64, new BigDecimal("37"), new BigDecimal("36.90"), new 
BigDecimal("3289379872.94565"), new BigDecimal("39579334534534.4"), new 
BigDecimal("363945093845093890.900"), 8.345d, 4.67f, 123456, 234235L, 3455, 
"string", "varchar", new DateTime(Timestamp.valueOf("2013-07-05 
17:01:00").getTime()), new DateTime(Date.valueOf("2013-07-05").getTime()), 
"char").go();
+  }
+
+  @Test // DRILL-3938
+  public void testNativeReaderIsDisabledForAlteredPartitionedTable() throws 
Exception {
+    String query = "select key, `value`, newcol from hive.kv_parquet order by 
key limit 1";
+
+    // Make sure the HiveScan in plan has no native parquet reader
+    testPlanMatchingPatterns(query, new String[] {"HiveScan"}, new 
String[]{"HiveDrillNativeParquetScan"});
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
index ef5a169..7583f42 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
@@ -120,25 +120,6 @@ public class TestHivePartitionPruning extends HiveTestBase 
{
     assertFalse(plan.contains("Filter"));
   }
 
-  @Test
-  public void pruneDataTypeSupportNativeReaders() throws Exception {
-    try {
-      test(String.format("alter session set `%s` = true", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-      final String query = "EXPLAIN PLAN FOR " +
-          "SELECT * FROM hive.readtest_parquet WHERE tinyint_part = 64";
-
-      final String plan = getPlanInString(query, OPTIQ_FORMAT);
-
-      // Check and make sure that Filter is not present in the plan
-      assertFalse(plan.contains("Filter"));
-
-      // Make sure the plan contains the Hive scan utilizing native parquet 
reader
-      assertTrue(plan.contains("groupscan=[HiveDrillNativeParquetScan"));
-    } finally {
-      test(String.format("alter session set `%s` = false", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-    }
-  }
-
   @Test // DRILL-3579
   public void selectFromPartitionedTableWithNullPartitions() throws Exception {
     final String query = "SELECT count(*) nullCount FROM 
hive.partition_pruning_test " +

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
index 6d7ad13..26a16b9 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveProjectPushDown.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.categories.HiveStorageTest;
 import org.apache.drill.exec.hive.HiveTestBase;
@@ -109,16 +108,4 @@ public class TestHiveProjectPushDown extends HiveTestBase {
     testHelper(query, 1, expectedColNames);
   }
 
-  @Test
-  public void projectPushDownOnHiveParquetTable() throws Exception {
-    try {
-      test(String.format("alter session set `%s` = true", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-      String query = "SELECT boolean_field, boolean_part, int_field, int_part 
FROM hive.readtest_parquet";
-      String expectedColNames = "\"columns\" : [ \"`boolean_field`\", 
\"`dir0`\", \"`int_field`\", \"`dir9`\" ]";
-
-      testHelper(query, 2, expectedColNames, "hive-drill-native-parquet-scan");
-    } finally {
-      test(String.format("alter session set `%s` = false", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
index e3d884b..5758eec 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
@@ -31,7 +31,7 @@ public class HiveTestBase extends PlanTestBase {
 
   @BeforeClass
   public static void generateHive() throws Exception {
-    hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher.getRootDir());
+    hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher);
     hiveTest.addHiveTestPlugin(getDrillbitContext().getStorage());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 4da22b6..1b77748 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -31,8 +31,10 @@ import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.joda.time.DateTime;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import java.math.BigDecimal;
 import java.sql.Date;
@@ -42,38 +44,25 @@ import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 @Category({SlowTest.class, HiveStorageTest.class})
 public class TestHiveStorage extends HiveTestBase {
+
   @BeforeClass
-  public static void setupOptions() throws Exception {
-    test(String.format("alter session set `%s` = true", 
PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
+  public static void init() {
+    setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
   }
 
-  @Test // DRILL-4083
-  public void testNativeScanWhenNoColumnIsRead() throws Exception {
-    try {
-      test(String.format("alter session set `%s` = true", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-
-      String query = "SELECT count(*) as col FROM hive.countStar_Parquet";
-      testPhysicalPlan(query, "hive-drill-native-parquet-scan");
-
-      testBuilder()
-          .sqlQuery(query)
-          .unOrdered()
-          .baselineColumns("col")
-          .baselineValues(200L)
-          .go();
-    } finally {
-      test("alter session reset `%s`",
-          ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS);
-    }
+  @AfterClass
+  public static void cleanup() {
+    resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
   }
 
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
   @Test
   public void hiveReadWithDb() throws Exception {
     test("select * from hive.kv");
@@ -206,123 +195,6 @@ public class TestHiveStorage extends HiveTestBase {
         .go();
   }
 
-  /**
-   * Test to ensure Drill reads the all supported types through native Parquet 
readers.
-   * NOTE: As part of Hive 1.2 upgrade, make sure this test and {@link 
#readAllSupportedHiveDataTypes()} are merged
-   * into one test.
-   */
-  @Test
-  public void readAllSupportedHiveDataTypesNativeParquet() throws Exception {
-    try {
-      test(String.format("alter session set `%s` = true", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-      final String query = "SELECT * FROM hive.readtest_parquet";
-
-      // Make sure the plan has Hive scan with native parquet reader
-      testPhysicalPlan(query, "hive-drill-native-parquet-scan");
-
-      testBuilder().sqlQuery(query)
-          .unOrdered()
-          .baselineColumns(
-              "binary_field",
-              "boolean_field",
-              "tinyint_field",
-              "decimal0_field",
-              "decimal9_field",
-              "decimal18_field",
-              "decimal28_field",
-              "decimal38_field",
-              "double_field",
-              "float_field",
-              "int_field",
-              "bigint_field",
-              "smallint_field",
-              "string_field",
-              "varchar_field",
-              "timestamp_field",
-              "char_field",
-              // There is a regression in Hive 1.2.1 in binary and boolean 
partition columns. Disable for now.
-              //"binary_part",
-              "boolean_part",
-              "tinyint_part",
-              "decimal0_part",
-              "decimal9_part",
-              "decimal18_part",
-              "decimal28_part",
-              "decimal38_part",
-              "double_part",
-              "float_part",
-              "int_part",
-              "bigint_part",
-              "smallint_part",
-              "string_part",
-              "varchar_part",
-              "timestamp_part",
-              "date_part",
-              "char_part")
-          .baselineValues(
-              "binaryfield".getBytes(),
-              false,
-              34,
-              new BigDecimal("66"),
-              new BigDecimal("2347.92"),
-              new BigDecimal("2758725827.99990"),
-              new BigDecimal("29375892739852.8"),
-              new BigDecimal("89853749534593985.783"),
-              8.345d,
-              4.67f,
-              123456,
-              234235L,
-              3455,
-              "stringfield",
-              "varcharfield",
-              new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
-              "charfield",
-              // There is a regression in Hive 1.2.1 in binary and boolean 
partition columns. Disable for now.
-              //"binary",
-              true,
-              64,
-              new BigDecimal("37"),
-              new BigDecimal("36.90"),
-              new BigDecimal("3289379872.94565"),
-              new BigDecimal("39579334534534.4"),
-              new BigDecimal("363945093845093890.900"),
-              8.345d,
-              4.67f,
-              123456,
-              234235L,
-              3455,
-              "string",
-              "varchar",
-              new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
-              new DateTime(Date.valueOf("2013-07-05").getTime()),
-              "char")
-          .baselineValues( // All fields are null, but partition fields have 
non-null values
-              null, null, null, null, null, null, null, null, null, null, 
null, null, null, null, null, null, null,
-              // There is a regression in Hive 1.2.1 in binary and boolean 
partition columns. Disable for now.
-              //"binary",
-              true,
-              64,
-              new BigDecimal("37"),
-              new BigDecimal("36.90"),
-              new BigDecimal("3289379872.94565"),
-              new BigDecimal("39579334534534.4"),
-              new BigDecimal("363945093845093890.900"),
-              8.345d,
-              4.67f,
-              123456,
-              234235L,
-              3455,
-              "string",
-              "varchar",
-              new DateTime(Timestamp.valueOf("2013-07-05 17:01:00").getTime()),
-              new DateTime(Date.valueOf("2013-07-05").getTime()),
-              "char")
-          .go();
-    } finally {
-        test(String.format("alter session set `%s` = false", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-    }
-  }
-
   @Test
   public void orderByOnHiveTable() throws Exception {
     testBuilder()
@@ -402,19 +274,7 @@ public class TestHiveStorage extends HiveTestBase {
         .go();
   }
 
-  @Test // DRILL-3938
-  public void nativeReaderIsDisabledForAlteredPartitionedTable() throws 
Exception {
-    try {
-      test(String.format("alter session set `%s` = true", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-      final String query = "EXPLAIN PLAN FOR SELECT key, `value`, newcol FROM 
hive.kv_parquet ORDER BY key LIMIT 1";
 
-      // Make sure the HiveScan in plan has no native parquet reader
-      final String planStr = getPlanInString(query, JSON_FORMAT);
-      assertFalse("Hive native is not expected in the plan", 
planStr.contains("hive-drill-native-parquet-scan"));
-    } finally {
-      test(String.format("alter session set `%s` = false", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-    }
-  }
 
   @Test // DRILL-3739
   public void readingFromStorageHandleBasedTable() throws Exception {
@@ -426,22 +286,6 @@ public class TestHiveStorage extends HiveTestBase {
         .go();
   }
 
-  @Test // DRILL-3739
-  public void readingFromStorageHandleBasedTable2() throws Exception {
-    try {
-      test(String.format("alter session set `%s` = true", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-
-      testBuilder()
-          .sqlQuery("SELECT * FROM hive.kv_sh ORDER BY key LIMIT 2")
-          .ordered()
-          .baselineColumns("key", "value")
-          .expectsEmptyResultSet()
-          .go();
-    } finally {
-      test(String.format("alter session set `%s` = false", 
ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
-    }
-  }
-
   @Test // DRILL-3688
   public void readingFromSmallTableWithSkipHeaderAndFooter() throws Exception {
    testBuilder()
@@ -480,23 +324,20 @@ public class TestHiveStorage extends HiveTestBase {
         .go();
   }
 
-  @Test // DRILL-3688
-  public void testIncorrectHeaderFooterProperty() throws Exception {
-    Map<String, String> testData = ImmutableMap.<String, String>builder()
-        .put("hive.skipper.kv_incorrect_skip_header","skip.header.line.count")
-        .put("hive.skipper.kv_incorrect_skip_footer", "skip.footer.line.count")
-        .build();
-
-    String query = "select * from %s";
-    String exceptionMessage = "Hive table property %s value 'A' is 
non-numeric";
-
-    for (Map.Entry<String, String> entry : testData.entrySet()) {
-      try {
-        test(String.format(query, entry.getKey()));
-      } catch (UserRemoteException e) {
-        assertThat(e.getMessage(), 
containsString(String.format(exceptionMessage, entry.getValue())));
-      }
-    }
+  @Test
+  public void testIncorrectHeaderProperty() throws Exception {
+    String query = "select * from hive.skipper.kv_incorrect_skip_header";
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage(containsString("Hive table property 
skip.header.line.count value 'A' is non-numeric"));
+    test(query);
+  }
+
+  @Test
+  public void testIncorrectFooterProperty() throws Exception {
+    String query = "select * from hive.skipper.kv_incorrect_skip_footer";
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage(containsString("Hive table property 
skip.footer.line.count value 'A' is non-numeric"));
+    test(query);
   }
 
   @Test
@@ -571,6 +412,7 @@ public class TestHiveStorage extends HiveTestBase {
   @Test
   public void testPhysicalPlanSubmission() throws Exception {
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.kv");
+    PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.readtest");
   }
 
   private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> 
columnsList, Map<String, Integer> expectedResult) {
@@ -583,9 +425,4 @@ public class TestHiveStorage extends HiveTestBase {
       assertTrue("Column should be nullable", columnMetadata.getIsNullable());
     }
   }
-
-  @AfterClass
-  public static void shutdownOptions() throws Exception {
-    test(String.format("alter session set `%s` = false", 
PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 7f7a401..80da976 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -46,9 +46,10 @@ public class TestInfoSchemaOnHiveStorage extends 
HiveTestBase {
         .baselineValues("hive.default", "kv")
         .baselineValues("hive.default", "kv_parquet")
         .baselineValues("hive.default", "kv_sh")
-        .baselineValues("hive.default", "countstar_parquet")
         .baselineValues("hive.default", "simple_json")
         .baselineValues("hive.default", "partition_with_few_schemas")
+        .baselineValues("hive.default", "kv_native")
+        .baselineValues("hive.default", "kv_native_ext")
         .go();
 
     testBuilder()
@@ -249,9 +250,10 @@ public class TestInfoSchemaOnHiveStorage extends 
HiveTestBase {
         .baselineValues("DRILL", "hive.default", "partition_pruning_test", 
"TABLE")
         .baselineValues("DRILL", "hive.default", "partition_with_few_schemas", 
"TABLE")
         .baselineValues("DRILL", "hive.default", "kv_parquet", "TABLE")
-        .baselineValues("DRILL", "hive.default", "countstar_parquet", "TABLE")
         .baselineValues("DRILL", "hive.default", "kv_sh", "TABLE")
         .baselineValues("DRILL", "hive.default", "simple_json", "TABLE")
+        .baselineValues("DRILL", "hive.default", "kv_native", "TABLE")
+        .baselineValues("DRILL", "hive.default", "kv_native_ext", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_text_small", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", 
"TABLE")

Reply via email to