DRILL-6130: Fix NPE during physical plan submission for various storage plugins

1. Fixed ser / de issues for Hive, Kafka, Hbase plugins.
2. Added physical plan submission unit test for all storage plugins in contrib 
module.
3. Refactoring.

closes #1108


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/58e4cec9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/58e4cec9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/58e4cec9

Branch: refs/heads/master
Commit: 58e4cec9a913e381ef0e96b072c31e34085277b3
Parents: 58f3b10
Author: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Authored: Thu Feb 1 17:44:43 2018 +0000
Committer: Vitalii Diravka <vitalii.dira...@gmail.com>
Committed: Fri Feb 16 20:25:25 2018 +0000

----------------------------------------------------------------------
 .../drill/exec/store/hbase/HBaseGroupScan.java  |   5 +-
 .../drill/exec/store/hbase/HBaseSubScan.java    |  38 +++---
 .../apache/drill/hbase/TestHBaseQueries.java    |   6 +
 .../planner/sql/HivePartitionDescriptor.java    |  14 +-
 ...onvertHiveParquetScanToDrillParquetScan.java |  14 +-
 .../store/hive/HiveDrillNativeParquetScan.java  |  22 ++--
 .../hive/HiveDrillNativeParquetSubScan.java     |   4 +-
 .../apache/drill/exec/store/hive/HiveScan.java  |  68 ++++++----
 .../drill/exec/store/hive/HiveSubScan.java      | 130 ++++++++++---------
 .../apache/drill/exec/hive/TestHiveStorage.java |   7 +-
 .../store/jdbc/TestJdbcPluginWithMySQLIT.java   |   5 +
 .../drill/exec/store/kafka/KafkaGroupScan.java  |  40 +++---
 .../drill/exec/store/kafka/KafkaSubScan.java    |  56 ++++----
 .../exec/store/kafka/KafkaQueriesTest.java      |  21 +--
 .../exec/store/kafka/MessageIteratorTest.java   |   2 +-
 .../drill/exec/store/kafka/QueryConstants.java  |  40 ------
 .../drill/exec/store/kafka/TestKafkaSuit.java   |   6 +-
 .../exec/store/kafka/TestQueryConstants.java    |  40 ++++++
 .../kafka/cluster/EmbeddedKafkaCluster.java     |   4 +-
 .../drill/exec/store/kudu/KuduGroupScan.java    |  39 +++---
 .../drill/exec/store/kudu/KuduSubScan.java      |  39 +++---
 .../apache/drill/store/kudu/TestKuduPlugin.java |   8 +-
 .../exec/store/mongo/TestMongoQueries.java      |   7 +
 .../store/openTSDB/TestOpenTSDBPlugin.java      |   7 +-
 .../apache/drill/exec/proto/UserBitShared.java  |  21 ++-
 .../exec/proto/beans/CoreOperatorType.java      |   4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   1 +
 27 files changed, 353 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 69c2725..1178298 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -160,7 +160,7 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
       statsCalculator = new TableStatsCalculator(conn, hbaseScanSpec, 
storagePlugin.getContext().getConfig(), storagePluginConfig);
 
       boolean foundStartRegion = false;
-      regionsToScan = new TreeMap<HRegionInfo, ServerName>();
+      regionsToScan = new TreeMap<>();
       for (HRegionLocation regionLocation : regionLocations) {
         HRegionInfo regionInfo = regionLocation.getRegionInfo();
         if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && 
hbaseScanSpec.getStartRow().length != 0 && 
!regionInfo.containsRow(hbaseScanSpec.getStartRow())) {
@@ -338,8 +338,7 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     assert minorFragmentId < endpointFragmentMapping.size() : String.format(
         "Mappings length [%d] should be greater than minor fragment id [%d] 
but it isn't.", endpointFragmentMapping.size(),
         minorFragmentId);
-    return new HBaseSubScan(getUserName(), storagePlugin, storagePluginConfig,
-        endpointFragmentMapping.get(minorFragmentId), columns);
+    return new HBaseSubScan(getUserName(), storagePlugin, 
endpointFragmentMapping.get(minorFragmentId), columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 0527391..bd179fb 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -24,7 +24,6 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -49,9 +48,6 @@ import com.google.common.base.Preconditions;
 public class HBaseSubScan extends AbstractBase implements SubScan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);
 
-  @JsonProperty
-  public final HBaseStoragePluginConfig storage;
-  @JsonIgnore
   private final HBaseStoragePlugin hbaseStoragePlugin;
   private final List<HBaseSubScanSpec> regionScanSpecList;
   private final List<SchemaPath> columns;
@@ -59,34 +55,36 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
   @JsonCreator
   public HBaseSubScan(@JacksonInject StoragePluginRegistry registry,
                       @JsonProperty("userName") String userName,
-                      @JsonProperty("storage") StoragePluginConfig storage,
+                      @JsonProperty("hbaseStoragePluginConfig") 
HBaseStoragePluginConfig hbaseStoragePluginConfig,
                       @JsonProperty("regionScanSpecList") 
LinkedList<HBaseSubScanSpec> regionScanSpecList,
                       @JsonProperty("columns") List<SchemaPath> columns) 
throws ExecutionSetupException {
-    super(userName);
-    hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
-    this.regionScanSpecList = regionScanSpecList;
-    this.storage = (HBaseStoragePluginConfig) storage;
-    this.columns = columns;
+    this(userName,
+        (HBaseStoragePlugin) registry.getPlugin(hbaseStoragePluginConfig),
+        regionScanSpecList,
+        columns);
   }
 
-  public HBaseSubScan(String userName, HBaseStoragePlugin plugin, 
HBaseStoragePluginConfig config,
-      List<HBaseSubScanSpec> regionInfoList, List<SchemaPath> columns) {
+  public HBaseSubScan(String userName,
+                      HBaseStoragePlugin hbaseStoragePlugin,
+                      List<HBaseSubScanSpec> regionInfoList,
+                      List<SchemaPath> columns) {
     super(userName);
-    hbaseStoragePlugin = plugin;
-    storage = config;
+    this.hbaseStoragePlugin = hbaseStoragePlugin;
     this.regionScanSpecList = regionInfoList;
     this.columns = columns;
   }
 
-  public List<HBaseSubScanSpec> getRegionScanSpecList() {
-    return regionScanSpecList;
+  @JsonProperty
+  public HBaseStoragePluginConfig getHbaseStoragePluginConfig() {
+    return hbaseStoragePlugin.getConfig();
   }
 
-  @JsonIgnore
-  public HBaseStoragePluginConfig getStorageConfig() {
-    return storage;
+  @JsonProperty
+  public List<HBaseSubScanSpec> getRegionScanSpecList() {
+    return regionScanSpecList;
   }
 
+  @JsonProperty
   public List<SchemaPath> getColumns() {
     return columns;
   }
@@ -109,7 +107,7 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HBaseSubScan(getUserName(), hbaseStoragePlugin, storage, 
regionScanSpecList, columns);
+    return new HBaseSubScan(getUserName(), hbaseStoragePlugin, 
regionScanSpecList, columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java
 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java
index ee839c5..0d53499 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java
@@ -20,6 +20,7 @@ package org.apache.drill.hbase;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.drill.PlanTestBase;
 import org.apache.drill.categories.HbaseStorageTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
@@ -109,4 +110,9 @@ public class TestHBaseQueries extends BaseHBaseTest {
     runHBaseSQLVerifyCount("SELECT row_key\n"
         + " FROM hbase.TestTableNullStr t WHERE row_key='a1'", 1);
   }
+
+  @Test
+  public void testPhysicalPlanSubmission() throws Exception {
+    PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hbase.TestTableNullStr");
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index b22c14d..2d2bb6c 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -64,7 +64,7 @@ public class HivePartitionDescriptor extends 
AbstractPartitionDescriptor {
     this.scanRel = scanRel;
     this.managedBuffer = managedBuffer.reallocIfNeeded(256);
     this.defaultPartitionValue = defaultPartitionValue;
-    for (HiveTableWrapper.FieldSchemaWrapper wrapper : ((HiveScan) 
scanRel.getGroupScan()).hiveReadEntry.table.partitionKeys) {
+    for (HiveTableWrapper.FieldSchemaWrapper wrapper : ((HiveScan) 
scanRel.getGroupScan()).getHiveReadEntry().table.partitionKeys) {
       partitionMap.put(wrapper.name, i);
       i++;
     }
@@ -88,7 +88,7 @@ public class HivePartitionDescriptor extends 
AbstractPartitionDescriptor {
 
   @Override
   public String getBaseTableLocation() {
-    HiveReadEntry origEntry = ((HiveScan) 
scanRel.getGroupScan()).hiveReadEntry;
+    HiveReadEntry origEntry = ((HiveScan) 
scanRel.getGroupScan()).getHiveReadEntry();
     return origEntry.table.getTable().getSd().getLocation();
   }
 
@@ -97,7 +97,7 @@ public class HivePartitionDescriptor extends 
AbstractPartitionDescriptor {
                                        BitSet partitionColumnBitSet, 
Map<Integer, String> fieldNameMap) {
     int record = 0;
     final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
-    final Map<String, String> partitionNameTypeMap = 
hiveScan.hiveReadEntry.table.getPartitionNameTypeMap();
+    final Map<String, String> partitionNameTypeMap = 
hiveScan.getHiveReadEntry().table.getPartitionNameTypeMap();
     for(PartitionLocation partitionLocation: partitions) {
       for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
         final String hiveType = 
partitionNameTypeMap.get(fieldNameMap.get(partitionColumnIndex));
@@ -126,7 +126,7 @@ public class HivePartitionDescriptor extends 
AbstractPartitionDescriptor {
   public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings 
plannerSettings) {
     HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
     String partitionName = column.getAsNamePart().getName();
-    Map<String, String> partitionNameTypeMap = 
hiveScan.hiveReadEntry.table.getPartitionNameTypeMap();
+    Map<String, String> partitionNameTypeMap = 
hiveScan.getHiveReadEntry().table.getPartitionNameTypeMap();
     String hiveType = partitionNameTypeMap.get(partitionName);
     PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) 
TypeInfoUtils.getTypeInfoFromTypeString(hiveType);
 
@@ -143,7 +143,7 @@ public class HivePartitionDescriptor extends 
AbstractPartitionDescriptor {
   @Override
   protected void createPartitionSublists() {
     List<PartitionLocation> locations = new LinkedList<>();
-    HiveReadEntry origEntry = ((HiveScan) 
scanRel.getGroupScan()).hiveReadEntry;
+    HiveReadEntry origEntry = ((HiveScan) 
scanRel.getGroupScan()).getHiveReadEntry();
     for (Partition partition: origEntry.getPartitions()) {
       locations.add(new HivePartitionLocation(partition.getValues(), 
partition.getSd().getLocation()));
     }
@@ -165,7 +165,7 @@ public class HivePartitionDescriptor extends 
AbstractPartitionDescriptor {
 
   private GroupScan createNewGroupScan(List<PartitionLocation> 
newPartitionLocations) throws ExecutionSetupException {
     HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
-    HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
+    HiveReadEntry origReadEntry = hiveScan.getHiveReadEntry();
     List<HiveTableWrapper.HivePartitionWrapper> oldPartitions = 
origReadEntry.partitions;
     List<HiveTableWrapper.HivePartitionWrapper> newPartitions = 
Lists.newLinkedList();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index b1b966a..a732245 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -97,16 +97,16 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
 
     final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
     final HiveConf hiveConf = hiveScan.getHiveConf();
-    final HiveTableWithColumnCache hiveTable = 
hiveScan.hiveReadEntry.getTable();
+    final HiveTableWithColumnCache hiveTable = 
hiveScan.getHiveReadEntry().getTable();
 
     final Class<? extends InputFormat<?,?>> tableInputFormat =
-        getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), 
hiveScan.hiveReadEntry, hiveTable.getSd(),
+        getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), 
hiveScan.getHiveReadEntry(), hiveTable.getSd(),
             hiveConf);
     if (tableInputFormat == null || 
!tableInputFormat.equals(MapredParquetInputFormat.class)) {
       return false;
     }
 
-    final List<HivePartitionWrapper> partitions = 
hiveScan.hiveReadEntry.getHivePartitionWrappers();
+    final List<HivePartitionWrapper> partitions = 
hiveScan.getHiveReadEntry().getHivePartitionWrappers();
     if (partitions == null) {
       return true;
     }
@@ -116,7 +116,7 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
     for (HivePartitionWrapper partition : partitions) {
       final StorageDescriptor partitionSD = partition.getPartition().getSd();
       Class<? extends InputFormat<?, ?>> inputFormat = getInputFormatFromSD(
-          HiveUtilities.getPartitionMetadata(partition.getPartition(), 
hiveTable), hiveScan.hiveReadEntry, partitionSD,
+          HiveUtilities.getPartitionMetadata(partition.getPartition(), 
hiveTable), hiveScan.getHiveReadEntry(), partitionSD,
           hiveConf);
       if (inputFormat == null || !inputFormat.equals(tableInputFormat)) {
         return false;
@@ -172,7 +172,7 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
       final PlannerSettings settings = 
PrelUtil.getPlannerSettings(call.getPlanner());
       final String partitionColumnLabel = settings.getFsPartitionColumnLabel();
 
-      final Table hiveTable = hiveScan.hiveReadEntry.getTable();
+      final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
       checkForUnsupportedDataTypes(hiveTable);
 
       final Map<String, String> partitionColMapping =
@@ -245,8 +245,8 @@ public class ConvertHiveParquetScanToDrillParquetScan 
extends StoragePluginOptim
     final HiveDrillNativeParquetScan nativeHiveScan =
         new HiveDrillNativeParquetScan(
             hiveScan.getUserName(),
-            hiveScan.hiveReadEntry,
-            hiveScan.storagePlugin,
+            hiveScan.getHiveReadEntry(),
+            hiveScan.getStoragePlugin(),
             nativeScanCols,
             null);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index ccec61a..202bd43 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -41,16 +41,16 @@ public class HiveDrillNativeParquetScan extends HiveScan {
 
   @JsonCreator
   public HiveDrillNativeParquetScan(@JsonProperty("userName") String userName,
-                                    @JsonProperty("hive-table") HiveReadEntry 
hiveReadEntry,
-                                    @JsonProperty("storage-plugin") String 
storagePluginName,
+                                    @JsonProperty("hiveReadEntry") 
HiveReadEntry hiveReadEntry,
+                                    @JsonProperty("hiveStoragePluginConfig") 
HiveStoragePluginConfig hiveStoragePluginConfig,
                                     @JsonProperty("columns") List<SchemaPath> 
columns,
                                     @JacksonInject StoragePluginRegistry 
pluginRegistry) throws ExecutionSetupException {
-    super(userName, hiveReadEntry, storagePluginName, columns, pluginRegistry);
+    super(userName, hiveReadEntry, hiveStoragePluginConfig, columns, 
pluginRegistry);
   }
 
-  public HiveDrillNativeParquetScan(String userName, HiveReadEntry 
hiveReadEntry, HiveStoragePlugin storagePlugin,
+  public HiveDrillNativeParquetScan(String userName, HiveReadEntry 
hiveReadEntry, HiveStoragePlugin hiveStoragePlugin,
       List<SchemaPath> columns, HiveMetadataProvider metadataProvider) throws 
ExecutionSetupException {
-    super(userName, hiveReadEntry, storagePlugin, columns, metadataProvider);
+    super(userName, hiveReadEntry, hiveStoragePlugin, columns, 
metadataProvider);
   }
 
   public HiveDrillNativeParquetScan(final HiveScan hiveScan) {
@@ -91,7 +91,7 @@ public class HiveDrillNativeParquetScan extends HiveScan {
 
   @Override
   public HiveScan clone(HiveReadEntry hiveReadEntry) throws 
ExecutionSetupException {
-    return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, 
storagePlugin, columns, metadataProvider);
+    return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, 
getStoragePlugin(), getColumns(), getMetadataProvider());
   }
 
   @Override
@@ -103,12 +103,12 @@ public class HiveDrillNativeParquetScan extends HiveScan {
 
   @Override
   public String toString() {
-    final List<HivePartitionWrapper> partitions = 
hiveReadEntry.getHivePartitionWrappers();
+    final List<HivePartitionWrapper> partitions = 
getHiveReadEntry().getHivePartitionWrappers();
     int numPartitions = partitions == null ? 0 : partitions.size();
-    return "HiveDrillNativeParquetScan [table=" + 
hiveReadEntry.getHiveTableWrapper()
-        + ", columns=" + columns
+    return "HiveDrillNativeParquetScan [table=" + 
getHiveReadEntry().getHiveTableWrapper()
+        + ", columns=" + getColumns()
         + ", numPartitions=" + numPartitions
         + ", partitions= " + partitions
-        + ", inputDirectories=" + 
metadataProvider.getInputDirectories(hiveReadEntry) + "]";
+        + ", inputDirectories=" + 
getMetadataProvider().getInputDirectories(getHiveReadEntry()) + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
index 43cf98e..2129ed4 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
@@ -42,9 +42,9 @@ public class HiveDrillNativeParquetSubScan extends 
HiveSubScan {
                                        @JsonProperty("hiveReadEntry") 
HiveReadEntry hiveReadEntry,
                                        @JsonProperty("splitClasses") 
List<String> splitClasses,
                                        @JsonProperty("columns") 
List<SchemaPath> columns,
-                                       @JsonProperty("storagePluginName") 
String pluginName)
+                                       
@JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig 
hiveStoragePluginConfig)
       throws IOException, ExecutionSetupException, 
ReflectiveOperationException {
-    super(registry, userName, splits, hiveReadEntry, splitClasses, columns, 
pluginName);
+    super(registry, userName, splits, hiveReadEntry, splitClasses, columns, 
hiveStoragePluginConfig);
   }
 
   public HiveDrillNativeParquetSubScan(final HiveSubScan subScan)

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index cf8a671..11d47f3 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -61,41 +61,36 @@ public class HiveScan extends AbstractGroupScan {
 
   private static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN = 20;
 
-  @JsonProperty("hive-table")
-  public HiveReadEntry hiveReadEntry;
+  private final HiveStoragePlugin hiveStoragePlugin;
+  private final HiveReadEntry hiveReadEntry;
+  private final HiveMetadataProvider metadataProvider;
 
-  @JsonIgnore
-  public HiveStoragePlugin storagePlugin;
-
-  @JsonProperty("columns")
-  public List<SchemaPath> columns;
-
-  @JsonIgnore
-  protected final HiveMetadataProvider metadataProvider;
-
-  @JsonIgnore
   private List<List<LogicalInputSplit>> mappings;
+  private List<LogicalInputSplit> inputSplits;
 
-  @JsonIgnore
-  protected List<LogicalInputSplit> inputSplits;
+  protected List<SchemaPath> columns;
 
   @JsonCreator
   public HiveScan(@JsonProperty("userName") final String userName,
-                  @JsonProperty("hive-table") final HiveReadEntry 
hiveReadEntry,
-                  @JsonProperty("storage-plugin") final String 
storagePluginName,
+                  @JsonProperty("hiveReadEntry") final HiveReadEntry 
hiveReadEntry,
+                  @JsonProperty("hiveStoragePluginConfig") final 
HiveStoragePluginConfig hiveStoragePluginConfig,
                   @JsonProperty("columns") final List<SchemaPath> columns,
                   @JacksonInject final StoragePluginRegistry pluginRegistry) 
throws ExecutionSetupException {
-    this(userName, hiveReadEntry, (HiveStoragePlugin) 
pluginRegistry.getPlugin(storagePluginName), columns, null);
+    this(userName,
+        hiveReadEntry,
+        (HiveStoragePlugin) pluginRegistry.getPlugin(hiveStoragePluginConfig),
+        columns,
+        null);
   }
 
-  public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, 
final HiveStoragePlugin storagePlugin,
+  public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, 
final HiveStoragePlugin hiveStoragePlugin,
       final List<SchemaPath> columns, final HiveMetadataProvider 
metadataProvider) throws ExecutionSetupException {
     super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
-    this.storagePlugin = storagePlugin;
+    this.hiveStoragePlugin = hiveStoragePlugin;
     if (metadataProvider == null) {
-      this.metadataProvider = new HiveMetadataProvider(userName, 
hiveReadEntry, storagePlugin.getHiveConf());
+      this.metadataProvider = new HiveMetadataProvider(userName, 
hiveReadEntry, hiveStoragePlugin.getHiveConf());
     } else {
       this.metadataProvider = metadataProvider;
     }
@@ -105,19 +100,39 @@ public class HiveScan extends AbstractGroupScan {
     super(that);
     this.columns = that.columns;
     this.hiveReadEntry = that.hiveReadEntry;
-    this.storagePlugin = that.storagePlugin;
+    this.hiveStoragePlugin = that.hiveStoragePlugin;
     this.metadataProvider = that.metadataProvider;
   }
 
   public HiveScan clone(final HiveReadEntry hiveReadEntry) throws 
ExecutionSetupException {
-    return new HiveScan(getUserName(), hiveReadEntry, storagePlugin, columns, 
metadataProvider);
+    return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, 
columns, metadataProvider);
+  }
+
+  @JsonProperty
+  public HiveReadEntry getHiveReadEntry() {
+    return hiveReadEntry;
   }
 
+  @JsonProperty
+  public HiveStoragePluginConfig getHiveStoragePluginConfig() {
+    return hiveStoragePlugin.getConfig();
+  }
+
+  @JsonProperty
   public List<SchemaPath> getColumns() {
     return columns;
   }
 
-  protected List<LogicalInputSplit> getInputSplits() {
+  @JsonIgnore
+  public HiveStoragePlugin getStoragePlugin() {
+    return hiveStoragePlugin;
+  }
+
+  protected HiveMetadataProvider getMetadataProvider() {
+    return metadataProvider;
+  }
+
+  private List<LogicalInputSplit> getInputSplits() {
     if (inputSplits == null) {
       inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
     }
@@ -125,6 +140,7 @@ public class HiveScan extends AbstractGroupScan {
     return inputSplits;
   }
 
+
   @Override
   public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> 
endpoints) {
     mappings = new ArrayList<>();
@@ -160,7 +176,7 @@ public class HiveScan extends AbstractGroupScan {
       }
 
       final HiveReadEntry subEntry = new 
HiveReadEntry(hiveReadEntry.getTableWrapper(), parts);
-      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, 
splitTypes, columns, storagePlugin);
+      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, 
splitTypes, columns, hiveStoragePlugin);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }
@@ -174,7 +190,7 @@ public class HiveScan extends AbstractGroupScan {
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
     final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
-    for (final DrillbitEndpoint endpoint : 
storagePlugin.getContext().getBits()) {
+    for (final DrillbitEndpoint endpoint : 
hiveStoragePlugin.getContext().getBits()) {
       endpointMap.put(endpoint.getAddress(), endpoint);
       logger.debug("endpoing address: {}", endpoint.getAddress());
     }
@@ -285,7 +301,7 @@ public class HiveScan extends AbstractGroupScan {
 
   @JsonIgnore
   public HiveConf getHiveConf() {
-    return storagePlugin.getHiveConf();
+    return hiveStoragePlugin.getHiveConf();
   }
 
   @JsonIgnore

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index a1990a0..8ca8647 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -40,26 +41,20 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Iterators;
 import com.google.common.io.ByteArrayDataInput;
 import com.google.common.io.ByteStreams;
 
 @JsonTypeName("hive-sub-scan")
 public class HiveSubScan extends AbstractBase implements SubScan {
-  protected HiveReadEntry hiveReadEntry;
 
-  @JsonIgnore
-  protected List<List<InputSplit>> inputSplits = new ArrayList<>();
-  @JsonIgnore
-  protected HiveTableWithColumnCache table;
-  @JsonIgnore
-  protected List<HivePartition> partitions;
-  @JsonIgnore
-  protected HiveStoragePlugin storagePlugin;
-
-  private List<List<String>> splits;
-  private List<String> splitClasses;
-  protected List<SchemaPath> columns;
+  private final HiveReadEntry hiveReadEntry;
+  private final List<List<InputSplit>> inputSplits = new ArrayList<>();
+  private final HiveStoragePlugin hiveStoragePlugin;
+  private final List<List<String>> splits;
+  private final List<String> splitClasses;
+  private final HiveTableWithColumnCache table;
+  private final List<HivePartition> partitions;
+  private final List<SchemaPath> columns;
 
   @JsonCreator
   public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
@@ -68,13 +63,22 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
                      @JsonProperty("hiveReadEntry") HiveReadEntry 
hiveReadEntry,
                      @JsonProperty("splitClasses") List<String> splitClasses,
                      @JsonProperty("columns") List<SchemaPath> columns,
-                     @JsonProperty("storagePluginName") String pluginName)
+                     @JsonProperty("hiveStoragePluginConfig") 
HiveStoragePluginConfig hiveStoragePluginConfig)
       throws IOException, ExecutionSetupException, 
ReflectiveOperationException {
-    this(userName, splits, hiveReadEntry, splitClasses, columns, 
(HiveStoragePlugin)registry.getPlugin(pluginName));
-  }
-
-  public HiveSubScan(final String userName, final List<List<String>> splits, 
final HiveReadEntry hiveReadEntry,
-      final List<String> splitClasses, final List<SchemaPath> columns, final 
HiveStoragePlugin plugin)
+    this(userName,
+        splits,
+        hiveReadEntry,
+        splitClasses,
+        columns,
+        (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig));
+  }
+
+  public HiveSubScan(final String userName,
+                     final List<List<String>> splits,
+                     final HiveReadEntry hiveReadEntry,
+                      final List<String> splitClasses,
+                     final List<SchemaPath> columns,
+                     final HiveStoragePlugin hiveStoragePlugin)
     throws IOException, ReflectiveOperationException {
     super(userName);
     this.hiveReadEntry = hiveReadEntry;
@@ -83,66 +87,61 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
     this.splits = splits;
     this.splitClasses = splitClasses;
     this.columns = columns;
-    this.storagePlugin = plugin;
+    this.hiveStoragePlugin = hiveStoragePlugin;
 
     for (int i = 0; i < splits.size(); i++) {
       inputSplits.add(deserializeInputSplit(splits.get(i), 
splitClasses.get(i)));
     }
   }
 
-  @JsonProperty("storagePluginName")
-  @SuppressWarnings("unused")
-  public String getStoragePluginName() {
-    return storagePlugin.getName();
-  }
-
-  @JsonIgnore
-  public HiveStoragePlugin getStoragePlugin() {
-    return storagePlugin;
-  }
-
+  @JsonProperty
   public List<List<String>> getSplits() {
     return splits;
   }
 
-  public HiveTableWithColumnCache getTable() {
-    return table;
-  }
-
-  public List<HivePartition> getPartitions() {
-    return partitions;
+  @JsonProperty
+  public HiveReadEntry getHiveReadEntry() {
+    return hiveReadEntry;
   }
 
+  @JsonProperty
   public List<String> getSplitClasses() {
     return splitClasses;
   }
 
+  @JsonProperty
   public List<SchemaPath> getColumns() {
     return columns;
   }
 
+  @JsonProperty
+  public HiveStoragePluginConfig getHiveStoragePluginConfig() {
+    return hiveStoragePlugin.getConfig();
+  }
+
+  @JsonIgnore
+  public HiveTableWithColumnCache getTable() {
+    return table;
+  }
+
+  @JsonIgnore
+  public List<HivePartition> getPartitions() {
+    return partitions;
+  }
+
+  @JsonIgnore
   public List<List<InputSplit>> getInputSplits() {
     return inputSplits;
   }
 
-  public HiveReadEntry getHiveReadEntry() {
-    return hiveReadEntry;
+  @JsonIgnore
+  public HiveStoragePlugin getStoragePlugin() {
+    return hiveStoragePlugin;
   }
 
-  public static List<InputSplit> deserializeInputSplit(List<String> base64, 
String className) throws IOException, ReflectiveOperationException{
-    Constructor<?> constructor = 
Class.forName(className).getDeclaredConstructor();
-    if (constructor == null) {
-      throw new ReflectiveOperationException("Class " + className + " does not 
implement a default constructor.");
-    }
-    constructor.setAccessible(true);
-    List<InputSplit> splits = new ArrayList<>();
-    for (String str : base64) {
-      InputSplit split = (InputSplit) constructor.newInstance();
-      ByteArrayDataInput byteArrayDataInput = 
ByteStreams.newDataInput(Base64.decodeBase64(str));
-      split.readFields(byteArrayDataInput);
-      splits.add(split);
-    }
-    return splits;
+  @JsonIgnore
+  public HiveConf getHiveConf() {
+    return hiveStoragePlugin.getHiveConf();
   }
 
   @Override
@@ -153,7 +152,7 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
     try {
-      return new HiveSubScan(getUserName(), splits, hiveReadEntry, 
splitClasses, columns, storagePlugin);
+      return new HiveSubScan(getUserName(), splits, hiveReadEntry, 
splitClasses, columns, hiveStoragePlugin);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }
@@ -161,7 +160,7 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
 
   @Override
   public Iterator<PhysicalOperator> iterator() {
-    return Iterators.emptyIterator();
+    return ImmutableSet.<PhysicalOperator>of().iterator();
   }
 
   @Override
@@ -169,8 +168,21 @@ public class HiveSubScan extends AbstractBase implements 
SubScan {
     return CoreOperatorType.HIVE_SUB_SCAN_VALUE;
   }
 
-  @JsonIgnore
-  public HiveConf getHiveConf() {
-    return storagePlugin.getHiveConf();
+  private static List<InputSplit> deserializeInputSplit(List<String> base64, 
String className)
+      throws IOException, ReflectiveOperationException{
+    Constructor<?> constructor = 
Class.forName(className).getDeclaredConstructor();
+    if (constructor == null) {
+      throw new ReflectiveOperationException("Class " + className + " does not 
implement a default constructor.");
+    }
+    constructor.setAccessible(true);
+    List<InputSplit> splits = new ArrayList<>();
+    for (String str : base64) {
+      InputSplit split = (InputSplit) constructor.newInstance();
+      ByteArrayDataInput byteArrayDataInput = 
ByteStreams.newDataInput(Base64.decodeBase64(str));
+      split.readFields(byteArrayDataInput);
+      splits.add(split);
+    }
+    return splits;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 64f3b5b..c2412ad 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.hive;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import org.apache.drill.PlanTestBase;
 import org.apache.drill.categories.HiveStorageTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
@@ -100,7 +101,6 @@ public class TestHiveStorage extends HiveTestBase {
   /**
    * Test to ensure Drill reads the all supported types correctly both normal 
fields (converted to Nullable types) and
    * partition fields (converted to Required types).
-   * @throws Exception
    */
   @Test
   public void readAllSupportedHiveDataTypes() throws Exception {
@@ -558,6 +558,11 @@ public class TestHiveStorage extends HiveTestBase {
         .go();
   }
 
+  @Test
+  public void testPhysicalPlanSubmission() throws Exception {
+    PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.kv");
+  }
+
   private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> 
columnsList, Map<String, Integer> expectedResult) {
     for (UserProtos.ResultColumnMetadata columnMetadata : columnsList) {
       assertTrue("Column should be present in result set", 
expectedResult.containsKey(columnMetadata.getColumnName()));

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
 
b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
index 00db46b..7b8c21a 100644
--- 
a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
+++ 
b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
@@ -127,4 +127,9 @@ public class TestJdbcPluginWithMySQLIT extends PlanTestBase 
{
     testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", 
"Filter" });
   }
 
+  @Test
+  public void testPhysicalPlanSubmission() throws Exception {
+    testPhysicalPlanExecutionBasedOnQuery("select * from 
mysql.`drill_mysql_test`.person");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index e08c7d7..9cf575b 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -69,47 +69,49 @@ public class KafkaGroupScan extends AbstractGroupScan {
   private static final long MSG_SIZE = 1024;
 
   private final KafkaStoragePlugin kafkaStoragePlugin;
-  private final KafkaStoragePluginConfig kafkaStoragePluginConfig;
-  private List<SchemaPath> columns;
   private final KafkaScanSpec kafkaScanSpec;
 
+  private List<SchemaPath> columns;
   private List<PartitionScanWork> partitionWorkList;
   private ListMultimap<Integer, PartitionScanWork> assignments;
   private List<EndpointAffinity> affinities;
 
   @JsonCreator
   public KafkaGroupScan(@JsonProperty("userName") String userName,
-      @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
-      @JsonProperty("columns") List<SchemaPath> columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
-      @JacksonInject StoragePluginRegistry pluginRegistry) {
-    this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+                        @JsonProperty("kafkaStoragePluginConfig") 
KafkaStoragePluginConfig kafkaStoragePluginConfig,
+                        @JsonProperty("columns") List<SchemaPath> columns,
+                        @JsonProperty("kafkaScanSpec") KafkaScanSpec scanSpec,
+                        @JacksonInject StoragePluginRegistry pluginRegistry) 
throws ExecutionSetupException {
+    this(userName,
+        (KafkaStoragePlugin) 
pluginRegistry.getPlugin(kafkaStoragePluginConfig),
+        columns,
+        scanSpec);
   }
 
   public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, KafkaScanSpec 
kafkaScanSpec, List<SchemaPath> columns) {
     super(StringUtils.EMPTY);
     this.kafkaStoragePlugin = kafkaStoragePlugin;
-    this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
     this.columns = columns;
     this.kafkaScanSpec = kafkaScanSpec;
     init();
   }
 
-  public KafkaGroupScan(String userName, KafkaStoragePluginConfig 
kafkaStoragePluginConfig, List<SchemaPath> columns,
-      KafkaScanSpec kafkaScanSpec, KafkaStoragePlugin pluginRegistry) {
+  public KafkaGroupScan(String userName,
+                        KafkaStoragePlugin kafkaStoragePlugin,
+                        List<SchemaPath> columns,
+                        KafkaScanSpec kafkaScanSpec) {
     super(userName);
-    this.kafkaStoragePluginConfig = kafkaStoragePluginConfig;
+    this.kafkaStoragePlugin = kafkaStoragePlugin;
     this.columns = columns;
     this.kafkaScanSpec = kafkaScanSpec;
-    this.kafkaStoragePlugin = pluginRegistry;
     init();
   }
 
   public KafkaGroupScan(KafkaGroupScan that) {
     super(that);
-    this.kafkaStoragePluginConfig = that.kafkaStoragePluginConfig;
+    this.kafkaStoragePlugin = that.kafkaStoragePlugin;
     this.columns = that.columns;
     this.kafkaScanSpec = that.kafkaScanSpec;
-    this.kafkaStoragePlugin = that.kafkaStoragePlugin;
     this.partitionWorkList = that.partitionWorkList;
     this.assignments = that.assignments;
   }
@@ -242,7 +244,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
           work.getBeginOffset(), work.getLatestOffset()));
     }
 
-    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, 
kafkaStoragePluginConfig, columns, scanSpecList);
+    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, 
scanSpecList);
   }
 
   @Override
@@ -291,9 +293,9 @@ public class KafkaGroupScan extends AbstractGroupScan {
     return clone;
   }
 
-  @JsonProperty("kafkaStoragePluginConfig")
-  public KafkaStoragePluginConfig getStorageConfig() {
-    return this.kafkaStoragePluginConfig;
+  @JsonProperty
+  public KafkaStoragePluginConfig getKafkaStoragePluginConfig() {
+    return kafkaStoragePlugin.getConfig();
   }
 
   @JsonProperty
@@ -301,8 +303,8 @@ public class KafkaGroupScan extends AbstractGroupScan {
     return columns;
   }
 
-  @JsonProperty("kafkaScanSpec")
-  public KafkaScanSpec getScanSpec() {
+  @JsonProperty
+  public KafkaScanSpec getKafkaScanSpec() {
     return kafkaScanSpec;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
index fc110b5..468f766 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -41,34 +41,31 @@ import com.google.common.base.Preconditions;
 @JsonTypeName("kafka-partition-scan")
 public class KafkaSubScan extends AbstractBase implements SubScan {
 
-  @JsonProperty
-  private final KafkaStoragePluginConfig KafkaStoragePluginConfig;
-
-  @JsonIgnore
   private final KafkaStoragePlugin kafkaStoragePlugin;
   private final List<SchemaPath> columns;
-  private final List<KafkaSubScanSpec> partitions;
+  private final List<KafkaSubScanSpec> partitionSubScanSpecList;
 
   @JsonCreator
-  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
-      @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
-      @JsonProperty("columns") List<SchemaPath> columns,
-      @JsonProperty("partitionSubScanSpecList") LinkedList<KafkaSubScanSpec> 
partitions)
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry,
+                      @JsonProperty("userName") String userName,
+                      @JsonProperty("kafkaStoragePluginConfig") 
KafkaStoragePluginConfig kafkaStoragePluginConfig,
+                      @JsonProperty("columns") List<SchemaPath> columns,
+                      @JsonProperty("partitionSubScanSpecList") 
LinkedList<KafkaSubScanSpec> partitionSubScanSpecList)
       throws ExecutionSetupException {
-    super(userName);
-    this.KafkaStoragePluginConfig = kafkaStoragePluginConfig;
-    this.columns = columns;
-    this.partitions = partitions;
-    this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkaStoragePluginConfig);
+    this(userName,
+        (KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig),
+        columns,
+        partitionSubScanSpecList);
   }
 
-  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
-      List<SchemaPath> columns, List<KafkaSubScanSpec> 
partitionSubScanSpecList) {
+  public KafkaSubScan(String userName,
+                      KafkaStoragePlugin kafkaStoragePlugin,
+                      List<SchemaPath> columns,
+                      List<KafkaSubScanSpec> partitionSubScanSpecList) {
     super(userName);
+    this.kafkaStoragePlugin = kafkaStoragePlugin;
     this.columns = columns;
-    this.KafkaStoragePluginConfig = kafkStoragePluginConfig;
-    this.kafkaStoragePlugin = plugin;
-    this.partitions = partitionSubScanSpecList;
+    this.partitionSubScanSpecList = partitionSubScanSpecList;
   }
 
   @Override
@@ -79,8 +76,7 @@ public class KafkaSubScan extends AbstractBase implements 
SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, 
KafkaStoragePluginConfig, columns,
-        partitions);
+    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, 
partitionSubScanSpecList);
   }
 
   @Override
@@ -88,22 +84,24 @@ public class KafkaSubScan extends AbstractBase implements 
SubScan {
     return Collections.emptyIterator();
   }
 
-  @JsonIgnore
+  @JsonProperty
   public KafkaStoragePluginConfig getKafkaStoragePluginConfig() {
-    return KafkaStoragePluginConfig;
-  }
-
-  @JsonIgnore
-  public KafkaStoragePlugin getKafkaStoragePlugin() {
-    return kafkaStoragePlugin;
+    return kafkaStoragePlugin.getConfig();
   }
 
+  @JsonProperty
   public List<SchemaPath> getColumns() {
     return columns;
   }
 
+  @JsonProperty
   public List<KafkaSubScanSpec> getPartitionSubScanSpecList() {
-    return partitions;
+    return partitionSubScanSpecList;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getKafkaStoragePlugin() {
+    return kafkaStoragePlugin;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index 1f5a1c0..ce9eb99 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -39,7 +38,7 @@ public class KafkaQueriesTest extends KafkaTestBase {
 
   @Test
   public void testSqlQueryOnInvalidTopic() throws Exception {
-    String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, 
QueryConstants.INVALID_TOPIC);
+    String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, 
TestQueryConstants.INVALID_TOPIC);
     try {
       
testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.<Map<String,
 Object>> emptyList())
           .build().run();
@@ -51,7 +50,7 @@ public class KafkaQueriesTest extends KafkaTestBase {
 
   @Test
   public void testResultCount() throws Exception {
-    String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, 
QueryConstants.JSON_TOPIC);
+    String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, 
TestQueryConstants.JSON_TOPIC);
     runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG);
   }
 
@@ -60,9 +59,9 @@ public class KafkaQueriesTest extends KafkaTestBase {
     // following kafka.tools.GetOffsetShell for earliest as -2
     Map<TopicPartition, Long> startOffsetsMap = fetchOffsets(-2);
 
-    String queryString = String.format(QueryConstants.MIN_OFFSET_QUERY, 
QueryConstants.JSON_TOPIC);
+    String queryString = String.format(TestQueryConstants.MIN_OFFSET_QUERY, 
TestQueryConstants.JSON_TOPIC);
     
testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("minOffset")
-        .baselineValues(startOffsetsMap.get(new 
TopicPartition(QueryConstants.JSON_TOPIC, 0))).go();
+        .baselineValues(startOffsetsMap.get(new 
TopicPartition(TestQueryConstants.JSON_TOPIC, 0))).go();
   }
 
   @Test
@@ -70,9 +69,9 @@ public class KafkaQueriesTest extends KafkaTestBase {
     // following kafka.tools.GetOffsetShell for latest as -1
     Map<TopicPartition, Long> endOffsetsMap = fetchOffsets(-1);
 
-    String queryString = String.format(QueryConstants.MAX_OFFSET_QUERY, 
QueryConstants.JSON_TOPIC);
+    String queryString = String.format(TestQueryConstants.MAX_OFFSET_QUERY, 
TestQueryConstants.JSON_TOPIC);
     
testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("maxOffset")
-        .baselineValues(endOffsetsMap.get(new 
TopicPartition(QueryConstants.JSON_TOPIC, 0))-1).go();
+        .baselineValues(endOffsetsMap.get(new 
TopicPartition(TestQueryConstants.JSON_TOPIC, 0))-1).go();
   }
 
   private Map<TopicPartition, Long> fetchOffsets(int flag) {
@@ -80,7 +79,7 @@ public class KafkaQueriesTest extends KafkaTestBase {
         new ByteArrayDeserializer(), new ByteArrayDeserializer());
 
     Map<TopicPartition, Long> offsetsMap = Maps.newHashMap();
-    kafkaConsumer.subscribe(Arrays.asList(QueryConstants.JSON_TOPIC));
+    
kafkaConsumer.subscribe(Collections.singletonList(TestQueryConstants.JSON_TOPIC));
     // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
     // evaluates lazily, seeking to the
     // first/last offset in all partitions only when poll(long) or
@@ -110,4 +109,10 @@ public class KafkaQueriesTest extends KafkaTestBase {
     return offsetsMap;
   }
 
+  @Test
+  public void testPhysicalPlanSubmission() throws Exception {
+    String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, 
TestQueryConstants.JSON_TOPIC);
+    testPhysicalPlanExecutionBasedOnQuery(query);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
index 0b31562..4a15596 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
@@ -49,7 +49,7 @@ public class MessageIteratorTest extends KafkaTestBase {
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
     consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
     kafkaConsumer = new KafkaConsumer<>(consumerProps);
-    subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, 
TestKafkaSuit.NUM_JSON_MSG);
+    subScanSpec = new KafkaSubScanSpec(TestQueryConstants.JSON_TOPIC, 0, 0, 
TestKafkaSuit.NUM_JSON_MSG);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java
deleted file mode 100644
index ff58f7e..0000000
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.kafka;
-
-public interface QueryConstants {
-
-  // Kafka Server Prop Constants
-  public static final String BROKER_DELIM = ",";
-  public final String LOCAL_HOST = "127.0.0.1";
-
-  // ZK
-  public final static String ZK_TMP = "zk_tmp";
-  public final static int TICK_TIME = 500;
-  public final static int MAX_CLIENT_CONNECTIONS = 100;
-
-  public static final String JSON_TOPIC = "drill-json-topic";
-  public static final String AVRO_TOPIC = "drill-avro-topic";
-  public static final String INVALID_TOPIC = "invalid-topic";
-
-  // Queries
-  public static final String MSG_COUNT_QUERY = "select count(*) from 
kafka.`%s`";
-  public static final String MSG_SELECT_QUERY = "select * from kafka.`%s`";
-  public static final String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as 
minOffset from kafka.`%s`";
-  public static final String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as 
maxOffset from kafka.`%s`";
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index 34677cb..ed01747 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -72,15 +72,15 @@ public class TestKafkaSuit {
         Properties topicProps = new Properties();
         zkClient = new 
ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), 
SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$);
         ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
-        AdminUtils.createTopic(zkUtils, QueryConstants.JSON_TOPIC, 1, 1, 
topicProps, RackAwareMode.Disabled$.MODULE$);
+        AdminUtils.createTopic(zkUtils, TestQueryConstants.JSON_TOPIC, 1, 1, 
topicProps, RackAwareMode.Disabled$.MODULE$);
 
         org.apache.kafka.common.requests.MetadataResponse.TopicMetadata 
fetchTopicMetadataFromZk = AdminUtils
-            .fetchTopicMetadataFromZk(QueryConstants.JSON_TOPIC, zkUtils);
+            .fetchTopicMetadataFromZk(TestQueryConstants.JSON_TOPIC, zkUtils);
         logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
 
         KafkaMessageGenerator generator = new 
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
             StringSerializer.class);
-        generator.populateJsonMsgIntoKafka(QueryConstants.JSON_TOPIC, 
NUM_JSON_MSG);
+        generator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC, 
NUM_JSON_MSG);
       }
       initCount.incrementAndGet();
       runningSuite = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
new file mode 100644
index 0000000..057af7e
--- /dev/null
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+public interface TestQueryConstants {
+
+  // Kafka Server Prop Constants
+  String BROKER_DELIM = ",";
+  String LOCAL_HOST = "127.0.0.1";
+
+  // ZK
+  String ZK_TMP = "zk_tmp";
+  int TICK_TIME = 500;
+  int MAX_CLIENT_CONNECTIONS = 100;
+
+  String JSON_TOPIC = "drill-json-topic";
+  String AVRO_TOPIC = "drill-avro-topic";
+  String INVALID_TOPIC = "invalid-topic";
+
+  // Queries
+  String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`";
+  String MSG_SELECT_QUERY = "select * from kafka.`%s`";
+  String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from 
kafka.`%s`";
+  String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from 
kafka.`%s`";
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
index 319c66c..663e0e4 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
@@ -26,7 +26,7 @@ import java.util.Properties;
 
 import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
-import org.apache.drill.exec.store.kafka.QueryConstants;
+import org.apache.drill.exec.store.kafka.TestQueryConstants;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.slf4j.Logger;
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
 
-public class EmbeddedKafkaCluster implements QueryConstants {
+public class EmbeddedKafkaCluster implements TestQueryConstants {
   private static final Logger logger = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
   private List<KafkaServerStartable> brokers;
   private final ZookeeperHelper zkHelper;

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
index dfc3c44..7bddf18 100644
--- 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
+++ 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store.kudu;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -45,7 +44,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.CompleteWork;
@@ -59,10 +57,10 @@ public class KuduGroupScan extends AbstractGroupScan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(KuduGroupScan.class);
   private static final long DEFAULT_TABLET_SIZE = 1000;
 
-  private KuduStoragePluginConfig storagePluginConfig;
+  private KuduStoragePlugin kuduStoragePlugin;
   private List<SchemaPath> columns;
   private KuduScanSpec kuduScanSpec;
-  private KuduStoragePlugin storagePlugin;
+
   private boolean filterPushedDown = false;
   private List<KuduWork> kuduWorkList = Lists.newArrayList();
   private ListMultimap<Integer,KuduWork> assignments;
@@ -71,31 +69,31 @@ public class KuduGroupScan extends AbstractGroupScan {
 
   @JsonCreator
   public KuduGroupScan(@JsonProperty("kuduScanSpec") KuduScanSpec kuduScanSpec,
-                        @JsonProperty("storage") KuduStoragePluginConfig 
storagePluginConfig,
+                        @JsonProperty("kuduStoragePluginConfig") 
KuduStoragePluginConfig kuduStoragePluginConfig,
                         @JsonProperty("columns") List<SchemaPath> columns,
                         @JacksonInject StoragePluginRegistry pluginRegistry) 
throws IOException, ExecutionSetupException {
-    this((KuduStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), 
kuduScanSpec, columns);
+    this((KuduStoragePlugin) 
pluginRegistry.getPlugin(kuduStoragePluginConfig), kuduScanSpec, columns);
   }
 
-  public KuduGroupScan(KuduStoragePlugin storagePlugin, KuduScanSpec scanSpec,
-      List<SchemaPath> columns) {
+  public KuduGroupScan(KuduStoragePlugin kuduStoragePlugin,
+                       KuduScanSpec kuduScanSpec,
+                       List<SchemaPath> columns) {
     super((String) null);
-    this.storagePlugin = storagePlugin;
-    this.storagePluginConfig = storagePlugin.getConfig();
-    this.kuduScanSpec = scanSpec;
+    this.kuduStoragePlugin = kuduStoragePlugin;
+    this.kuduScanSpec = kuduScanSpec;
     this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
     init();
   }
 
   private void init() {
     String tableName = kuduScanSpec.getTableName();
-    Collection<DrillbitEndpoint> endpoints = 
storagePlugin.getContext().getBits();
+    Collection<DrillbitEndpoint> endpoints = 
kuduStoragePlugin.getContext().getBits();
     Map<String,DrillbitEndpoint> endpointMap = Maps.newHashMap();
     for (DrillbitEndpoint endpoint : endpoints) {
       endpointMap.put(endpoint.getAddress(), endpoint);
     }
     try {
-      List<LocatedTablet> locations = 
storagePlugin.getClient().openTable(tableName).getTabletsLocations(10000);
+      List<LocatedTablet> locations = 
kuduStoragePlugin.getClient().openTable(tableName).getTabletsLocations(10000);
       for (LocatedTablet tablet : locations) {
         KuduWork work = new 
KuduWork(tablet.getPartition().getPartitionKeyStart(), 
tablet.getPartition().getPartitionKeyEnd());
         for (Replica replica : tablet.getReplicas()) {
@@ -153,10 +151,9 @@ public class KuduGroupScan extends AbstractGroupScan {
    */
   private KuduGroupScan(KuduGroupScan that) {
     super(that);
+    this.kuduStoragePlugin = that.kuduStoragePlugin;
     this.columns = that.columns;
     this.kuduScanSpec = that.kuduScanSpec;
-    this.storagePlugin = that.storagePlugin;
-    this.storagePluginConfig = that.storagePluginConfig;
     this.filterPushedDown = that.filterPushedDown;
     this.kuduWorkList = that.kuduWorkList;
     this.assignments = that.assignments;
@@ -204,7 +201,7 @@ public class KuduGroupScan extends AbstractGroupScan {
       scanSpecList.add(new KuduSubScanSpec(getTableName(), 
work.getPartitionKeyStart(), work.getPartitionKeyEnd()));
     }
 
-    return new KuduSubScan(storagePlugin, storagePluginConfig, scanSpecList, 
this.columns);
+    return new KuduSubScan(kuduStoragePlugin, scanSpecList, this.columns);
   }
 
   // KuduStoragePlugin plugin, KuduStoragePluginConfig config,
@@ -224,7 +221,7 @@ public class KuduGroupScan extends AbstractGroupScan {
 
   @JsonIgnore
   public KuduStoragePlugin getStoragePlugin() {
-    return storagePlugin;
+    return kuduStoragePlugin;
   }
 
   @JsonIgnore
@@ -244,9 +241,9 @@ public class KuduGroupScan extends AbstractGroupScan {
         + columns + "]";
   }
 
-  @JsonProperty("storage")
-  public KuduStoragePluginConfig getStorageConfig() {
-    return this.storagePluginConfig;
+  @JsonProperty
+  public KuduStoragePluginConfig getKuduStoragePluginConfig() {
+    return kuduStoragePlugin.getConfig();
   }
 
   @JsonProperty

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
index 9025db7..ca577e7 100644
--- 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
+++ 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,9 +21,9 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -37,49 +37,40 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
 
 // Class containing information for reading a single Kudu tablet
-@JsonTypeName("kudu-tablet-scan")
+@JsonTypeName("kudu-sub-scan")
 public class KuduSubScan extends AbstractBase implements SubScan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(KuduSubScan.class);
 
-  @JsonProperty
-  public final KuduStoragePluginConfig storage;
-
-
   private final KuduStoragePlugin kuduStoragePlugin;
   private final List<KuduSubScanSpec> tabletScanSpecList;
   private final List<SchemaPath> columns;
 
   @JsonCreator
   public KuduSubScan(@JacksonInject StoragePluginRegistry registry,
-                      @JsonProperty("storage") StoragePluginConfig storage,
-      @JsonProperty("tabletScanSpecList") LinkedList<KuduSubScanSpec> 
tabletScanSpecList,
+                      @JsonProperty("kuduStoragePluginConfig") 
KuduStoragePluginConfig kuduStoragePluginConfig,
+                      @JsonProperty("tabletScanSpecList") 
LinkedList<KuduSubScanSpec> tabletScanSpecList,
                       @JsonProperty("columns") List<SchemaPath> columns) 
throws ExecutionSetupException {
     super((String) null);
-    kuduStoragePlugin = (KuduStoragePlugin) registry.getPlugin(storage);
+    kuduStoragePlugin = (KuduStoragePlugin) 
registry.getPlugin(kuduStoragePluginConfig);
     this.tabletScanSpecList = tabletScanSpecList;
-    this.storage = (KuduStoragePluginConfig) storage;
     this.columns = columns;
   }
 
-  public KuduSubScan(KuduStoragePlugin plugin, KuduStoragePluginConfig config,
-      List<KuduSubScanSpec> tabletInfoList, List<SchemaPath> columns) {
+  public KuduSubScan(KuduStoragePlugin plugin, List<KuduSubScanSpec> 
tabletInfoList, List<SchemaPath> columns) {
     super((String) null);
-    kuduStoragePlugin = plugin;
-    storage = config;
+    this.kuduStoragePlugin = plugin;
     this.tabletScanSpecList = tabletInfoList;
     this.columns = columns;
   }
 
-  public List<KuduSubScanSpec> getTabletScanSpecList() {
-    return tabletScanSpecList;
+  public KuduStoragePluginConfig getKuduStoragePluginConfig() {
+    return kuduStoragePlugin.getConfig();
   }
 
-  @JsonIgnore
-  public KuduStoragePluginConfig getStorageConfig() {
-    return storage;
+  public List<KuduSubScanSpec> getTabletScanSpecList() {
+    return tabletScanSpecList;
   }
 
   public List<SchemaPath> getColumns() {
@@ -104,12 +95,12 @@ public class KuduSubScan extends AbstractBase implements 
SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new KuduSubScan(kuduStoragePlugin, storage, tabletScanSpecList, 
columns);
+    return new KuduSubScan(kuduStoragePlugin, tabletScanSpecList, columns);
   }
 
   @Override
   public Iterator<PhysicalOperator> iterator() {
-    return Iterators.emptyIterator();
+    return ImmutableSet.<PhysicalOperator>of().iterator();
   }
 
   public static class KuduSubScanSpec {
@@ -143,7 +134,7 @@ public class KuduSubScan extends AbstractBase implements 
SubScan {
 
   @Override
   public int getOperatorType() {
-    return CoreOperatorType.HBASE_SUB_SCAN_VALUE;
+    return CoreOperatorType.KUDU_SUB_SCAN_VALUE;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
 
b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
index b94033a..4e1c7fd 100644
--- 
a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
+++ 
b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.store.kudu;
 
+import org.apache.drill.PlanTestBase;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.KuduStorageTest;
 import org.junit.Ignore;
@@ -44,6 +45,11 @@ public class TestKuduPlugin extends BaseTestQuery {
     test("create table kudu.regions as select 1, * from sys.options limit 1");
     test("select * from kudu.regions");
     test("drop table kudu.regions");
+  }
 
+  @Test
+  public void testPhysicalPlanSubmission() throws Exception {
+    PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
kudu.demo");
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
 
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
index d8043fd..8d0064f 100644
--- 
a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
+++ 
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
@@ -82,4 +82,11 @@ public class TestMongoQueries extends MongoTestBase {
         DONUTS_DB, DONUTS_COLLECTION);
     runMongoSQLVerifyCount(queryString, 5);
   }
+
+  @Test
+  public void testPhysicalPlanSubmission() throws Exception {
+    String query = String.format(TEST_BOOLEAN_FILTER_QUERY_TEMPLATE1,
+        EMPLOYEE_DB, EMPINFO_COLLECTION);
+    testPhysicalPlanExecutionBasedOnQuery(query);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
 
b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
index 2d6c506..27ca09c 100644
--- 
a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
+++ 
b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
@@ -174,10 +174,9 @@ public class TestOpenTSDBPlugin extends PlanTestBase {
   }
 
   @Test
-  public void testPhysicalPlanExecutionBasedOnQuery() throws Exception {
-    String query = "EXPLAIN PLAN for select * from 
openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`";
-    String plan = getPlanInString(query, JSON_FORMAT);
-    Assert.assertEquals(18, testPhysical(plan));
+  public void testPhysicalPlanSubmission() throws Exception {
+    String query = "select * from openTSDB.`(metric=warp.speed.test, 
start=47y-ago, aggregator=sum)`";
+    testPhysicalPlanExecutionBasedOnQuery(query);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 9ef1f8d..6ae9deb 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -521,6 +521,10 @@ public final class UserBitShared {
      * <code>KAFKA_SUB_SCAN = 38;</code>
      */
     KAFKA_SUB_SCAN(38, 38),
+    /**
+     * <code>KUDU_SUB_SCAN = 39;</code>
+     */
+    KUDU_SUB_SCAN(39, 39),
     ;
 
     /**
@@ -679,6 +683,10 @@ public final class UserBitShared {
      * <code>KAFKA_SUB_SCAN = 38;</code>
      */
     public static final int KAFKA_SUB_SCAN_VALUE = 38;
+    /**
+     * <code>KUDU_SUB_SCAN = 39;</code>
+     */
+    public static final int KUDU_SUB_SCAN_VALUE = 39;
 
 
     public final int getNumber() { return value; }
@@ -724,6 +732,7 @@ public final class UserBitShared {
         case 36: return AVRO_SUB_SCAN;
         case 37: return PCAP_SUB_SCAN;
         case 38: return KAFKA_SUB_SCAN;
+        case 39: return KUDU_SUB_SCAN;
         default: return null;
       }
     }
@@ -24104,7 +24113,7 @@ public final class UserBitShared {
       "agmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALL" +
       
"OCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\t"
 +
       "CANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_" 
+
-      "REQUESTED\020\006*\204\006\n\020CoreOperatorType\022\021\n\rSING" +
+      "REQUESTED\020\006*\227\006\n\020CoreOperatorType\022\021\n\rSING" +
       "LE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FIL" 
+
       
"TER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004" +
       "\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDE" +
@@ -24123,11 +24132,11 @@ public final class UserBitShared {
       "X_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020 \022\022\n\016HB",
       "ASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOO" +
       "P_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_" +
-      "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&*g\n\nSaslStatus" +
-      
"\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SA"
 +
-      
"SL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SA" +
-      "SL_FAILED\020\004B.\n\033org.apache.drill.exec.pro" +
-      "toB\rUserBitSharedH\001"
+      "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_S" +
+      "CAN\020\'*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n" +
+      
"\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014S" +
+      "ASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.ap" +
+      "ache.drill.exec.protoB\rUserBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 8ad38a5..71595f7 100644
--- 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -60,7 +60,8 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
     NESTED_LOOP_JOIN(35),
     AVRO_SUB_SCAN(36),
     PCAP_SUB_SCAN(37),
-    KAFKA_SUB_SCAN(38);
+    KAFKA_SUB_SCAN(38),
+    KUDU_SUB_SCAN(39);
     
     public final int number;
     
@@ -117,6 +118,7 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
             case 36: return AVRO_SUB_SCAN;
             case 37: return PCAP_SUB_SCAN;
             case 38: return KAFKA_SUB_SCAN;
+            case 39: return KUDU_SUB_SCAN;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto 
b/protocol/src/main/protobuf/UserBitShared.proto
index dc8bdb6..b13f059 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -326,6 +326,7 @@ enum CoreOperatorType {
   AVRO_SUB_SCAN = 36;
   PCAP_SUB_SCAN = 37;
   KAFKA_SUB_SCAN = 38;
+  KUDU_SUB_SCAN = 39;
 }
 
 /* Registry that contains list of jars, each jar contains its name and list of 
function signatures.

Reply via email to