DRILL-5730: Mock testing improvements and interface improvements

closes #1045


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/186536d5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/186536d5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/186536d5

Branch: refs/heads/master
Commit: 186536d544d02ffc01339a4645e2a533545a2f86
Parents: 9926eda
Author: Timothy Farkas <timothyfar...@apache.org>
Authored: Thu Jan 11 14:59:41 2018 -0800
Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Committed: Fri Jan 26 13:42:27 2018 +0200

----------------------------------------------------------------------
 .../apache/drill/common/config/DrillConfig.java |  10 +-
 .../drill/common/config/NestedConfig.java       |   2 -
 .../store/mapr/db/MapRDBScanBatchCreator.java   |  11 +-
 .../mapr/db/json/MaprDBJsonRecordReader.java    |   4 +-
 .../exec/store/hbase/HBaseRecordReader.java     |   4 +-
 .../exec/store/hbase/HBaseScanBatchCreator.java |  10 +-
 .../codegen/templates/HiveRecordReaders.java    |   2 +-
 .../hive/HiveDrillNativeScanBatchCreator.java   |   7 +-
 .../exec/store/hive/HiveScanBatchCreator.java   |   6 +-
 .../exec/store/hive/HiveStoragePlugin.java      |   1 +
 .../drill/exec/store/hive/HiveUtilities.java    |   6 +-
 .../apache/drill/exec/hive/TestHiveStorage.java |   3 -
 .../drill/exec/store/jdbc/JdbcBatchCreator.java |   7 +-
 .../drill/exec/store/jdbc/JdbcRecordReader.java |   9 +-
 .../exec/store/kafka/KafkaRecordReader.java     |  13 +-
 .../exec/store/kafka/KafkaScanBatchCreator.java |   4 +-
 .../drill/exec/store/kudu/KuduRecordReader.java |   4 +-
 .../exec/store/kudu/KuduScanBatchCreator.java   |   6 +-
 .../exec/store/kudu/KuduWriterBatchCreator.java |   4 +-
 .../exec/store/mongo/MongoRecordReader.java     |   5 +-
 .../exec/store/mongo/MongoScanBatchCreator.java |   6 +-
 .../store/openTSDB/OpenTSDBBatchCreator.java    |   4 +-
 .../exec/expr/fn/FunctionGenerationHelper.java  |   2 +-
 .../drill/exec/ops/BaseFragmentContext.java     |   6 +-
 .../drill/exec/ops/BaseOperatorContext.java     |  22 +-
 .../drill/exec/ops/ExchangeFragmentContext.java |  34 ++
 .../drill/exec/ops/ExecutorFragmentContext.java |  56 ++
 .../apache/drill/exec/ops/FragmentContext.java  | 511 ++++---------------
 .../drill/exec/ops/FragmentContextImpl.java     | 491 ++++++++++++++++++
 .../apache/drill/exec/ops/OperatorContext.java  |   2 +-
 .../drill/exec/ops/OperatorContextImpl.java     |  12 +-
 .../apache/drill/exec/ops/OperatorStats.java    |   3 -
 .../drill/exec/ops/RootFragmentContext.java     |  24 +
 .../org/apache/drill/exec/ops/UdfUtilities.java |   2 +-
 .../drill/exec/physical/base/AbstractBase.java  |  11 -
 .../drill/exec/physical/base/Exchange.java      |  18 +-
 .../exec/physical/base/PhysicalOperator.java    |  28 +-
 .../drill/exec/physical/impl/BaseRootExec.java  |  14 +-
 .../drill/exec/physical/impl/BatchCreator.java  |   6 +-
 .../drill/exec/physical/impl/ImplCreator.java   |  19 +-
 .../physical/impl/MergingReceiverCreator.java   |   4 +-
 .../drill/exec/physical/impl/RootCreator.java   |   6 +-
 .../drill/exec/physical/impl/ScanBatch.java     |   6 +-
 .../drill/exec/physical/impl/ScreenCreator.java |  14 +-
 .../exec/physical/impl/SingleSenderCreator.java |   7 +-
 .../impl/TopN/PriorityQueueTemplate.java        |   1 -
 .../exec/physical/impl/TopN/TopNBatch.java      |  12 +-
 .../impl/TopN/TopNSortBatchCreator.java         |  10 +-
 .../exec/physical/impl/WriterRecordBatch.java   |   4 +-
 .../physical/impl/aggregate/HashAggBatch.java   |   7 +-
 .../impl/aggregate/HashAggBatchCreator.java     |   9 +-
 .../impl/aggregate/HashAggTemplate.java         |   4 +-
 .../physical/impl/aggregate/HashAggregator.java |  27 +-
 .../impl/aggregate/SpilledRecordbatch.java      |   8 +-
 .../impl/aggregate/StreamingAggBatch.java       |   6 +-
 .../aggregate/StreamingAggBatchCreator.java     |   9 +-
 .../impl/aggregate/StreamingAggregator.java     |   1 -
 .../broadcastsender/BroadcastSenderCreator.java |   4 +-
 .../BroadcastSenderRootExec.java                |   4 +-
 .../physical/impl/common/ChainedHashTable.java  |   3 +-
 .../exec/physical/impl/common/HashTable.java    |  48 +-
 .../physical/impl/common/HashTableTemplate.java |   7 +-
 .../impl/filter/FilterBatchCreator.java         |  10 +-
 .../physical/impl/filter/FilterRecordBatch.java |  17 +-
 .../physical/impl/filter/FilterSignature.java   |   4 +-
 .../physical/impl/filter/FilterTemplate2.java   |   4 +-
 .../physical/impl/filter/FilterTemplate4.java   |   2 -
 .../exec/physical/impl/filter/Filterer.java     |  10 +-
 .../impl/flatten/FlattenBatchCreator.java       |   9 +-
 .../impl/flatten/FlattenRecordBatch.java        |   2 -
 .../exec/physical/impl/flatten/Flattener.java   |  20 +-
 .../exec/physical/impl/join/HashJoinBatch.java  |   7 +-
 .../impl/join/HashJoinBatchCreator.java         |   4 +-
 .../exec/physical/impl/join/HashJoinProbe.java  |  18 +-
 .../exec/physical/impl/join/JoinWorker.java     |  11 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |   7 +-
 .../physical/impl/join/MergeJoinCreator.java    |   7 +-
 .../impl/join/NestedLoopJoinBatchCreator.java   |   4 +-
 .../physical/impl/limit/LimitBatchCreator.java  |   4 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |  42 +-
 .../OrderedPartitionRecordBatch.java            |  16 +-
 .../OrderedPartitionSenderCreator.java          |   4 +-
 .../partitionsender/PartitionSenderCreator.java |   4 +-
 .../PartitionSenderRootExec.java                |  17 +-
 .../impl/partitionsender/Partitioner.java       |  31 +-
 .../partitionsender/PartitionerDecorator.java   |   4 +-
 .../partitionsender/PartitionerTemplate.java    |   3 +-
 .../impl/producer/ProducerConsumerBatch.java    |   4 +-
 .../producer/ProducerConsumerBatchCreator.java  |   4 +-
 .../impl/project/ComplexToJsonBatchCreator.java |   7 +-
 .../impl/project/ProjectBatchCreator.java       |   7 +-
 .../impl/project/ProjectRecordBatch.java        |   2 +-
 .../exec/physical/impl/sort/SortBatch.java      |   4 +-
 .../physical/impl/sort/SortBatchCreator.java    |   8 +-
 .../impl/sort/SortRecordBatchBuilder.java       |   5 -
 .../impl/svremover/RemovingRecordBatch.java     |   6 +-
 .../impl/svremover/SVRemoverCreator.java        |  10 +-
 .../physical/impl/trace/TraceBatchCreator.java  |   8 +-
 .../impl/union/UnionAllBatchCreator.java        |  10 +-
 .../impl/union/UnionAllRecordBatch.java         |   4 +-
 .../UnorderedReceiverBatch.java                 |  20 +-
 .../UnorderedReceiverCreator.java               |   6 +-
 .../impl/validate/IteratorValidatorCreator.java |   6 +-
 .../impl/values/ValuesBatchCreator.java         |   6 +-
 .../impl/window/WindowFrameBatchCreator.java    |   5 +-
 .../impl/window/WindowFrameRecordBatch.java     |   2 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |  19 +-
 .../impl/xsort/ExternalSortBatchCreator.java    |   6 +-
 .../exec/physical/impl/xsort/MSortTemplate.java |   2 +-
 .../physical/impl/xsort/SingleBatchSorter.java  |   4 +-
 .../impl/xsort/SingleBatchSorterTemplate.java   |   6 +-
 .../impl/xsort/managed/ExternalSortBatch.java   |   8 +-
 .../impl/xsort/managed/MSortTemplate.java       |  10 +-
 .../physical/impl/xsort/managed/MSorter.java    |   4 +-
 .../drill/exec/record/AbstractRecordBatch.java  |   6 +-
 .../exec/record/AbstractSingleRecordBatch.java  |   2 +-
 .../apache/drill/exec/rpc/data/DataTunnel.java  |  43 --
 .../drill/exec/server/BootStrapContext.java     |   8 +
 .../server/options/FragmentOptionManager.java   |   3 +-
 .../apache/drill/exec/store/ColumnExplorer.java |  10 -
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   4 +-
 .../store/dfs/easy/EasyReaderBatchCreator.java  |   8 +-
 .../store/dfs/easy/EasyWriterBatchCreator.java  |   6 +-
 .../exec/store/direct/DirectBatchCreator.java   |   6 +-
 .../exec/store/easy/json/JSONRecordReader.java  |   2 +-
 .../exec/store/easy/text/TextFormatPlugin.java  |   2 +-
 .../compliant/CompliantTextRecordReader.java    |   3 +-
 .../store/ischema/InfoSchemaBatchCreator.java   |   8 +-
 .../store/mock/ExtendedMockRecordReader.java    |   3 +-
 .../drill/exec/store/mock/MockRecordReader.java |   6 +-
 .../exec/store/mock/MockScanBatchCreator.java   |   8 +-
 .../store/parquet/ParquetRGFilterEvaluator.java |   4 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   9 +-
 .../parquet/ParquetWriterBatchCreator.java      |   4 +-
 .../ParquetToDrillTypeConverter.java            |   5 +-
 .../stat/ParquetFooterStatCollector.java        |   3 -
 .../store/sys/BitToUserConnectionIterator.java  |  13 +-
 .../drill/exec/store/sys/DrillbitIterator.java  |   8 +-
 .../drill/exec/store/sys/MemoryIterator.java    |  10 +-
 .../exec/store/sys/ProfileInfoIterator.java     |   4 +-
 .../drill/exec/store/sys/ProfileIterator.java   |  10 +-
 .../exec/store/sys/ProfileJsonIterator.java     |   4 +-
 .../drill/exec/store/sys/SystemTable.java       |  30 +-
 .../exec/store/sys/SystemTableBatchCreator.java |   6 +-
 .../drill/exec/store/sys/ThreadsIterator.java   |   8 +-
 .../drill/exec/testing/ControlsInjector.java    |  15 +-
 .../drill/exec/testing/ExecutionControls.java   |   6 +
 .../exec/testing/ExecutionControlsInjector.java |  14 +-
 .../exec/testing/NoOpControlsInjector.java      |   6 +-
 .../exec/work/batch/AbstractDataCollector.java  |   7 +-
 .../exec/work/batch/BaseRawBatchBuffer.java     |  28 +-
 .../exec/work/batch/ControlMessageHandler.java  |   4 +-
 .../exec/work/batch/SpoolingRawBatchBuffer.java |   6 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     |   4 +-
 .../exec/work/foreman/FragmentsRunner.java      |   6 +-
 .../work/fragment/AbstractFragmentManager.java  |   5 +-
 .../exec/work/fragment/FragmentExecutor.java    |  25 +-
 .../work/fragment/FragmentStatusReporter.java   |  16 +-
 .../java/org/apache/drill/PlanningBase.java     |  87 +---
 .../org/apache/drill/TestDynamicUDFSupport.java |  27 +-
 .../java/org/apache/drill/exec/ExecTest.java    |  36 +-
 .../java/org/apache/drill/exec/RunRootExec.java |   4 +-
 .../apache/drill/exec/client/DumpCatTest.java   |  19 +-
 .../apache/drill/exec/expr/ExpressionTest.java  |  69 ++-
 .../exec/expr/fn/impl/TestStringFunctions.java  |   3 -
 .../drill/exec/fn/impl/TestMathFunctions.java   |  18 +-
 .../drill/exec/fn/impl/TestMultiInputAdd.java   |  60 +--
 .../exec/fn/impl/TestNewMathFunctions.java      |  42 +-
 .../exec/fn/impl/TestRepeatedFunction.java      |  21 +-
 .../fn/interp/ExpressionInterpreterTest.java    |   8 +-
 .../impersonation/TestInboundImpersonation.java |   3 +-
 .../drill/exec/memory/TestAllocators.java       |   8 +-
 .../exec/physical/impl/SimpleRootExec.java      |  14 +-
 .../exec/physical/impl/TestCastFunctions.java   | 120 +++--
 .../physical/impl/TestComparisonFunctions.java  | 115 ++---
 .../physical/impl/TestConvertFunctions.java     |  18 +-
 .../impl/TestImplicitCastFunctions.java         |  38 +-
 .../exec/physical/impl/TestOptiqPlans.java      | 322 ------------
 .../physical/impl/TestReverseImplicitCast.java  |   7 +-
 .../exec/physical/impl/TestSimpleFunctions.java |  92 ++--
 .../exec/physical/impl/TestStringFunctions.java | 111 ++--
 .../drill/exec/physical/impl/agg/TestAgg.java   |  41 +-
 .../physical/impl/filter/TestSimpleFilter.java  |  32 +-
 .../exec/physical/impl/join/TestHashJoin.java   |  47 +-
 .../exec/physical/impl/join/TestMergeJoin.java  |  75 ++-
 .../physical/impl/limit/TestSimpleLimit.java    |  38 +-
 .../partitionsender/TestPartitionSender.java    |   8 +-
 .../impl/project/TestSimpleProjection.java      |  21 +-
 .../exec/physical/impl/sort/TestSimpleSort.java |  32 +-
 .../impl/trace/TestTraceMultiRecordBatch.java   |  18 +-
 .../impl/trace/TestTraceOutputDump.java         |  18 +-
 .../physical/impl/union/TestSimpleUnion.java    |  18 +-
 .../impl/xsort/managed/TestSortImpl.java        |   4 +-
 .../physical/unit/MiniPlanUnitTestBase.java     |  21 +-
 .../physical/unit/PhysicalOpUnitTestBase.java   | 285 +++++++----
 .../TestHardAffinityFragmentParallelizer.java   |  16 +-
 .../record/ExpressionTreeMaterializerTest.java  |  72 +--
 .../drill/exec/record/TestRecordIterator.java   |  22 +-
 .../drill/exec/record/TestVectorContainer.java  | 128 +++++
 .../drill/exec/rpc/data/TestBitBitKerberos.java | 166 +++---
 .../apache/drill/exec/rpc/data/TestBitRpc.java  | 129 +++--
 .../security/TestCustomUserAuthenticator.java   |   3 +-
 .../rpc/user/security/TestUserBitKerberos.java  |   3 +-
 .../security/TestUserBitKerberosEncryption.java |  36 +-
 .../exec/rpc/user/security/TestUserBitSSL.java  |  11 +-
 .../rpc/user/security/TestUserBitSSLServer.java |  22 +-
 .../security/TestUserBitSaslCompatibility.java  |  34 +-
 .../exec/server/TestOptionsAuthEnabled.java     |   3 +-
 .../server/rest/TestMainLoginPageModel.java     |   8 +-
 .../spnego/TestDrillSpnegoAuthenticator.java    |   3 +-
 .../rest/spnego/TestSpnegoAuthentication.java   |  18 +-
 .../server/rest/spnego/TestSpnegoConfig.java    |  10 +-
 .../exec/store/TestAffinityCalculator.java      | 162 +-----
 .../store/parquet/ParquetRecordReaderTest.java  |  21 +-
 .../parquet/TestParquetFilterPushDown.java      |   6 +-
 .../store/parquet/TestParquetMetadataCache.java |  37 +-
 .../testing/TestCountDownLatchInjection.java    |   3 +-
 .../drill/exec/testing/TestPauseInjection.java  |   3 +-
 .../drill/exec/util/TestQueryMemoryAlloc.java   |  51 +-
 .../fragment/FragmentStatusReporterTest.java    |  26 +-
 .../org/apache/drill/test/ClusterFixture.java   |   1 -
 .../org/apache/drill/test/ConfigBuilder.java    |  49 +-
 .../org/apache/drill/test/OperatorFixture.java  | 281 +++++++---
 .../common/expression/LogicalExpression.java    |  27 +-
 224 files changed, 2732 insertions(+), 2941 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java 
b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 0d3ce9a..73c899d 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -45,13 +45,11 @@ public class DrillConfig extends NestedConfig {
 
   private final ImmutableList<String> startupArguments;
 
-  public static final boolean ON_OSX = 
System.getProperty("os.name").contains("OS X");
-
   @SuppressWarnings("restriction")
   private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory();
 
   @VisibleForTesting
-  public DrillConfig(Config config, boolean enableServerConfigs) {
+  public DrillConfig(Config config) {
     super(config);
     logger.debug("Setting up DrillConfig object.");
     logger.trace("Given Config object is:\n{}",
@@ -138,7 +136,7 @@ public class DrillConfig extends NestedConfig {
    * @param overrideFileResourcePathname
    *          the classpath resource pathname of the file to use for
    *          configuration override purposes; {@code null} specifies to use 
the
-   *          default pathname ({@link CommonConstants.CONFIG_OVERRIDE}) (does
+   *          default pathname ({@link 
CommonConstants#CONFIG_OVERRIDE_RESOURCE_PATHNAME}) (does
    *          <strong>not</strong> specify to suppress trying to load an
    *          overrides file)
    *  @return A merged Config object.
@@ -169,7 +167,7 @@ public class DrillConfig extends NestedConfig {
    * @return {@link DrillConfig} instance
    */
   public static DrillConfig create(Config config) {
-    return new DrillConfig(config.resolve(), true);
+    return new DrillConfig(config.resolve());
   }
 
   /**
@@ -252,7 +250,7 @@ public class DrillConfig extends NestedConfig {
     logger.info("Configuration and plugin file(s) identified in {}ms.\n{}",
         watch.elapsed(TimeUnit.MILLISECONDS),
         logString);
-    return new DrillConfig(effectiveConfig.resolve(), enableServerConfigs);
+    return new DrillConfig(effectiveConfig.resolve());
   }
 
   public <T> Class<T> getClassAt(String location, Class<T> clazz) throws 
DrillConfigurationException {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/config/NestedConfig.java 
b/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
index 60fe013..a1533a5 100644
--- a/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
@@ -30,8 +30,6 @@ import com.typesafe.config.ConfigResolveOptions;
 import com.typesafe.config.ConfigValue;
 
 abstract class NestedConfig implements Config {
-  // private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(NestedConfig.class);
-
   private final Config c;
 
   NestedConfig(Config c) {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
index e770c96..d9c8ce7 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.mapr.db;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -33,11 +33,9 @@ import 
org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class);
-
+public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan> {
   @Override
-  public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, 
List<RecordBatch> children) throws ExecutionSetupException {
+  public ScanBatch getBatch(ExecutorFragmentContext context, MapRDBSubScan 
subScan, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
     for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
@@ -46,8 +44,7 @@ public class MapRDBScanBatchCreator implements 
BatchCreator<MapRDBSubScan>{
           readers.add(new HBaseRecordReader(
               subScan.getFormatPlugin().getConnection(),
               getHBaseSubScanSpec(scanSpec),
-              subScan.getColumns(),
-              context));
+              subScan.getColumns()));
         } else {
           readers.add(new MaprDBJsonRecordReader(scanSpec, 
subScan.getFormatPluginConfig(), subScan.getColumns(), context));
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 1327541..9f93e18 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -72,7 +72,6 @@ import io.netty.buffer.DrillBuf;
 public class MaprDBJsonRecordReader extends AbstractRecordReader {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MaprDBJsonRecordReader.class);
 
-  public static final SchemaPath ID_PATH = SchemaPath.getSimplePath(ID_KEY);
   private final long MILLISECONDS_IN_A_DAY  = (long)1000 * 60 * 60 * 24;
 
   private Table table;
@@ -117,7 +116,7 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
 
     disableCountOptimization = formatPluginConfig.disableCountOptimization();
     setColumns(projectedColumns);
-    unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+    unionEnabled = 
context.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
     readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble();
     allTextMode = formatPluginConfig.isAllTextMode();
     ignoreSchemaChange = formatPluginConfig.isIgnoreSchemaChange();
@@ -518,5 +517,4 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
       table.close();
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 631c44d..cb555e5 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -33,7 +33,6 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.impl.OutputMutator;
@@ -84,8 +83,7 @@ public class HBaseRecordReader extends AbstractRecordReader 
implements DrillHBas
 
   private final Connection connection;
 
-  public HBaseRecordReader(Connection connection, 
HBaseSubScan.HBaseSubScanSpec subScanSpec,
-      List<SchemaPath> projectedColumns, FragmentContext context) {
+  public HBaseRecordReader(Connection connection, 
HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns) {
     this.connection = connection;
     hbaseTableName = TableName.valueOf(
         Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan 
spec").getTableName());

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 8e815b9..ff9a4e4 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -31,11 +31,9 @@ import org.apache.drill.exec.store.RecordReader;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseScanBatchCreator.class);
-
+public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan> {
   @Override
-  public ScanBatch getBatch(FragmentContext context, HBaseSubScan subScan, 
List<RecordBatch> children)
+  public ScanBatch getBatch(ExecutorFragmentContext context, HBaseSubScan 
subScan, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
@@ -45,7 +43,7 @@ public class HBaseScanBatchCreator implements 
BatchCreator<HBaseSubScan>{
         if ((columns = subScan.getColumns())==null) {
           columns = GroupScan.ALL_COLUMNS;
         }
-        readers.add(new 
HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, 
columns, context));
+        readers.add(new 
HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, 
columns));
       } catch (Exception e1) {
         throw new ExecutionSetupException(e1);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java 
b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
index b4b2039..4a75ed3 100644
--- 
a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
+++ 
b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
@@ -40,9 +40,9 @@ import java.util.Properties;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.store.hive.HivePartition;
 import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.io.Writable;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 60a01a5..243d781 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -28,7 +28,7 @@ import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -57,9 +57,10 @@ import com.google.common.collect.Maps;
 
 @SuppressWarnings("unused")
 public class HiveDrillNativeScanBatchCreator implements 
BatchCreator<HiveDrillNativeParquetSubScan> {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveDrillNativeScanBatchCreator.class);
 
   @Override
-  public ScanBatch getBatch(FragmentContext context, 
HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
+  public ScanBatch getBatch(ExecutorFragmentContext context, 
HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
     final HiveTableWithColumnCache table = config.getTable();
     final List<List<InputSplit>> splits = config.getInputSplits();
@@ -176,7 +177,7 @@ public class HiveDrillNativeScanBatchCreator implements 
BatchCreator<HiveDrillNa
         ImpersonationUtil.createProxyUgi(config.getUserName(), 
context.getQueryUserName())));
     }
 
-    return new ScanBatch(config, context, oContext, readers, implicitColumns);
+    return new ScanBatch(context, oContext, readers, implicitColumns);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index 3df8fd9..9f1e29e 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.hive;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -29,12 +29,12 @@ import 
org.apache.drill.exec.store.hive.readers.initilializers.ReadersInitialize
 
 @SuppressWarnings("unused")
 public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveScanBatchCreator.class);
 
   @Override
-  public ScanBatch getBatch(FragmentContext context, HiveSubScan config, 
List<RecordBatch> children)
+  public ScanBatch getBatch(ExecutorFragmentContext context, HiveSubScan 
config, List<RecordBatch> children)
       throws ExecutionSetupException {
     AbstractReadersInitializer readersInitializer = 
ReadersInitializer.getInitializer(context, config);
     return new ScanBatch(config, context, readersInitializer.init());
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 8a842b2..925ff49 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -32,6 +32,7 @@ import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import 
org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index 4164649..b101f49 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -32,7 +32,7 @@ import 
org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal9Holder;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableBitVector;
@@ -288,7 +288,7 @@ public class HiveUtilities {
     }
   }
 
-  public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo 
typeInfo, final OptionManager options) {
+  public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo 
typeInfo, final OptionSet options) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE: {
         PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
@@ -325,7 +325,7 @@ public class HiveUtilities {
   }
 
   public static TypeProtos.MinorType 
getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo,
-                                                                           
OptionManager options) {
+                                                                           
OptionSet options) {
     switch(primitiveTypeInfo.getPrimitiveCategory()) {
       case BINARY:
         return TypeProtos.MinorType.VARBINARY;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index c07c9d8..64f3b5b 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.hive;
 
-import mockit.integration.junit4.JMockit;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.apache.drill.categories.HiveStorageTest;
@@ -33,7 +32,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
 
 import java.math.BigDecimal;
 import java.sql.Date;
@@ -48,7 +46,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
-@RunWith(JMockit.class)
 @Category({SlowTest.class, HiveStorageTest.class})
 public class TestHiveStorage extends HiveTestBase {
   @BeforeClass

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
 
b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
index 1782e1a..d37bfad 100755
--- 
a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
+++ 
b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
@@ -21,10 +21,9 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
-//import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
 
@@ -32,11 +31,11 @@ import com.google.common.base.Preconditions;
 
 public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
   @Override
-  public ScanBatch getBatch(FragmentContext context, JdbcSubScan config,
+  public ScanBatch getBatch(ExecutorFragmentContext context, JdbcSubScan 
config,
       List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     JdbcStoragePlugin plugin = config.getPlugin();
-    RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), 
config.getSql(), plugin.getName());
+    RecordReader reader = new JdbcRecordReader(plugin.getSource(), 
config.getSql(), plugin.getName());
     return new ScanBatch(config, context, Collections.singletonList(reader));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
 
b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index 531f343..364b8a8 100755
--- 
a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ 
b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -40,7 +40,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
@@ -70,17 +69,13 @@ class JdbcRecordReader extends AbstractRecordReader {
   private final DataSource source;
   private ResultSet resultSet;
   private final String storagePluginName;
-  private FragmentContext fragmentContext;
   private Connection connection;
   private Statement statement;
   private final String sql;
   private ImmutableList<ValueVector> vectors;
   private ImmutableList<Copier<?>> copiers;
 
-  private OperatorContext operatorContext;
-
-  public JdbcRecordReader(FragmentContext fragmentContext, DataSource source, 
String sql, String storagePluginName) {
-    this.fragmentContext = fragmentContext;
+  public JdbcRecordReader(DataSource source, String sql, String 
storagePluginName) {
     this.source = source;
     this.sql = sql;
     this.storagePluginName = storagePluginName;
@@ -170,8 +165,6 @@ class JdbcRecordReader extends AbstractRecordReader {
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
     try {
-
-      this.operatorContext = operatorContext;
       connection = source.getConnection();
       statement = connection.createStatement();
       resultSet = statement.executeQuery(sql);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index c08c86e..801618c 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -66,13 +66,12 @@ public class KafkaRecordReader extends AbstractRecordReader 
{
   public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List<SchemaPath> projectedColumns,
       FragmentContext context, KafkaStoragePlugin plugin) {
     setColumns(projectedColumns);
-    this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
-    this.readNumbersAsDouble = context.getOptions()
-        .getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
-    OptionManager options = context.getOptions();
-    this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE);
-    this.kafkaMsgReader = 
options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
-    this.kafkaPollTimeOut = 
options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
+    final OptionManager optionManager = context.getOptions();
+    this.enableAllTextMode = 
optionManager.getBoolean(ExecConstants.KAFKA_ALL_TEXT_MODE);
+    this.readNumbersAsDouble = 
optionManager.getBoolean(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE);
+    this.unionEnabled = 
optionManager.getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
+    this.kafkaMsgReader = 
optionManager.getString(ExecConstants.KAFKA_RECORD_READER);
+    this.kafkaPollTimeOut = 
optionManager.getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
     this.plugin = plugin;
     this.subScanSpec = subScanSpec;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index 026e341..55fc28f 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -38,7 +38,7 @@ public class KafkaScanBatchCreator implements 
BatchCreator<KafkaSubScan> {
   static final Logger logger = 
LoggerFactory.getLogger(KafkaScanBatchCreator.class);
 
   @Override
-  public CloseableRecordBatch getBatch(FragmentContext context, KafkaSubScan 
subScan, List<RecordBatch> children)
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, 
KafkaSubScan subScan, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<SchemaPath> columns = subScan.getColumns() != null ? 
subScan.getColumns() : GroupScan.ALL_COLUMNS;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index b9a7bf9..e180dd9 100644
--- 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -29,7 +29,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
@@ -87,8 +86,7 @@ public class KuduRecordReader extends AbstractRecordReader {
 
   private ImmutableList<ProjectedColumnInfo> projectedCols;
 
-  public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec 
subScanSpec,
-      List<SchemaPath> projectedColumns, FragmentContext context) {
+  public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec 
subScanSpec, List<SchemaPath> projectedColumns) {
     setColumns(projectedColumns);
     this.client = client;
     scanSpec = subScanSpec;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
index fc1db5d..5e9d86c 100644
--- 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
+++ 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -35,7 +35,7 @@ public class KuduScanBatchCreator implements 
BatchCreator<KuduSubScan>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(KuduScanBatchCreator.class);
 
   @Override
-  public ScanBatch getBatch(FragmentContext context, KuduSubScan subScan, 
List<RecordBatch> children)
+  public ScanBatch getBatch(ExecutorFragmentContext context, KuduSubScan 
subScan, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
@@ -46,7 +46,7 @@ public class KuduScanBatchCreator implements 
BatchCreator<KuduSubScan>{
         if ((columns = subScan.getColumns())==null) {
           columns = GroupScan.ALL_COLUMNS;
         }
-        readers.add(new 
KuduRecordReader(subScan.getStorageEngine().getClient(), scanSpec, columns, 
context));
+        readers.add(new 
KuduRecordReader(subScan.getStorageEngine().getClient(), scanSpec, columns));
       } catch (Exception e1) {
         throw new ExecutionSetupException(e1);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
index c200c17..962a1d0 100644
--- 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
+++ 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.kudu;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
 import org.apache.drill.exec.record.CloseableRecordBatch;
@@ -30,7 +30,7 @@ public class KuduWriterBatchCreator implements 
BatchCreator<KuduWriter> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(KuduWriterBatchCreator.class);
 
   @Override
-  public CloseableRecordBatch getBatch(FragmentContext context, KuduWriter 
config, List<RecordBatch> children)
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, 
KuduWriter config, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children != null && children.size() == 1;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index da516dd..19f9e07 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -25,7 +25,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -68,7 +67,6 @@ public class MongoRecordReader extends AbstractRecordReader {
   private final Document fields;
 
   private final FragmentContext fragmentContext;
-  private OperatorContext operatorContext;
 
   private final MongoStoragePlugin plugin;
 
@@ -146,13 +144,12 @@ public class MongoRecordReader extends 
AbstractRecordReader {
     }
     MongoClient client = plugin.getClient(addresses);
     MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
-    this.unionEnabled = 
fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+    this.unionEnabled = 
fragmentContext.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
     collection = db.getCollection(subScanSpec.getCollectionName(), 
BsonDocument.class);
   }
 
   @Override
   public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
-    this.operatorContext = context;
     this.writer = new VectorContainerWriter(output, unionEnabled);
     // Default is BsonReader and all text mode will not be honored in
     // BsonRecordReader

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index 9935184..42717fc 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -32,15 +32,13 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoCredential;
 
 public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
   static final Logger logger = LoggerFactory
       .getLogger(MongoScanBatchCreator.class);
 
   @Override
-  public ScanBatch getBatch(FragmentContext context, MongoSubScan subScan,
+  public ScanBatch getBatch(ExecutorFragmentContext context, MongoSubScan 
subScan,
       List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
 
b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
index 935aaa5..fce2d7a 100644
--- 
a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
+++ 
b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.openTSDB;
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -33,7 +33,7 @@ import java.util.List;
 public class OpenTSDBBatchCreator implements BatchCreator<OpenTSDBSubScan> {
 
   @Override
-  public CloseableRecordBatch getBatch(FragmentContext context, 
OpenTSDBSubScan subScan,
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, 
OpenTSDBSubScan subScan,
                                        List<RecordBatch> children) throws 
ExecutionSetupException {
     List<RecordReader> readers = Lists.newArrayList();
     List<SchemaPath> columns;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java
index 6c186db..435b451 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java
@@ -98,7 +98,7 @@ public class FunctionGenerationHelper {
   public static LogicalExpression getOrderingComparatorNullsHigh(
       HoldingContainer left,
       HoldingContainer right,
-      FunctionImplementationRegistry registry) {
+      FunctionLookupContext registry) {
     return getOrderingComparator(true, left, right, registry);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
index a39213c..f81d4c9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.ops;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.drill.exec.compile.CodeCompiler;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
@@ -32,8 +31,7 @@ import io.netty.buffer.DrillBuf;
  * Common implementation for both the test and production versions
  * of the fragment context.
  */
-
-public abstract class BaseFragmentContext implements FragmentContextInterface {
+public abstract class BaseFragmentContext implements FragmentContext {
 
   private final FunctionImplementationRegistry funcRegistry;
 
@@ -46,8 +44,6 @@ public abstract class BaseFragmentContext implements 
FragmentContextInterface {
     return funcRegistry;
   }
 
-  protected abstract CodeCompiler getCompiler();
-
   @Override
   public <T> T getImplementationClass(final ClassGenerator<T> cg)
       throws ClassTransformationException, IOException {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
index 7c87570..211bd65 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
@@ -39,18 +39,17 @@ import io.netty.buffer.DrillBuf;
  * version of the operator context and the full production-time context
  * that includes network services.
  */
-
 public abstract class BaseOperatorContext implements OperatorContext {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class);
 
-  protected final FragmentContextInterface context;
+  protected final FragmentContext context;
   protected final BufferAllocator allocator;
   protected final PhysicalOperator popConfig;
   protected final BufferManager manager;
   private DrillFileSystem fs;
   private ControlsInjector injector;
 
-  public BaseOperatorContext(FragmentContextInterface context, BufferAllocator 
allocator,
+  public BaseOperatorContext(FragmentContext context, BufferAllocator 
allocator,
                PhysicalOperator popConfig) {
     this.context = context;
     this.allocator = allocator;
@@ -59,7 +58,7 @@ public abstract class BaseOperatorContext implements 
OperatorContext {
   }
 
   @Override
-  public FragmentContextInterface getFragmentContext() {
+  public FragmentContext getFragmentContext() {
     return context;
   }
 
@@ -104,17 +103,17 @@ public abstract class BaseOperatorContext implements 
OperatorContext {
   // Allow an operator to use the thread pool
   @Override
   public ExecutorService getExecutor() {
-    return context.getDrillbitContext().getExecutor();
+    return context.getExecutor();
   }
 
   @Override
   public ExecutorService getScanExecutor() {
-    return context.getDrillbitContext().getScanExecutor();
+    return context.getScanExecutor();
   }
 
   @Override
   public ExecutorService getScanDecodeExecutor() {
-    return context.getDrillbitContext().getScanDecodeExecutor();
+    return context.getScanDecodeExecutor();
   }
 
   @Override
@@ -165,12 +164,9 @@ public abstract class BaseOperatorContext implements 
OperatorContext {
         fs = null;
       }
     } catch (IOException e) {
-      if (ex == null) {
-        ex = UserException
-            .resourceError(e)
-            .addContext("Failed to close the Drill file system for " + 
getName())
-            .build(logger);
-      }
+      throw UserException.resourceError(e)
+        .addContext("Failed to close the Drill file system for " + getName())
+        .build(logger);
     }
     if (ex != null) {
       throw ex;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExchangeFragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExchangeFragmentContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExchangeFragmentContext.java
new file mode 100644
index 0000000..2fb0745
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExchangeFragmentContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+
+public interface ExchangeFragmentContext extends FragmentContext {
+  void waitForSendComplete();
+
+  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint 
endpoint);
+
+  AccountingUserConnection getUserDataTunnel();
+
+  Controller getController();
+
+  IncomingBuffers getBuffers();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExecutorFragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExecutorFragmentContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExecutorFragmentContext.java
new file mode 100644
index 0000000..82bb886
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExecutorFragmentContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.QueryProfileStoreContext;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public interface ExecutorFragmentContext extends RootFragmentContext {
+
+  PhysicalPlanReader getPlanReader();
+
+  ClusterCoordinator getClusterCoordinator();
+
+  CoordinationProtos.DrillbitEndpoint getForemanEndpoint();
+
+  CoordinationProtos.DrillbitEndpoint getEndpoint();
+
+  Collection<CoordinationProtos.DrillbitEndpoint> getBits();
+
+  OperatorCreatorRegistry getOperatorCreatorRegistry();
+
+  void setBuffers(final IncomingBuffers buffers);
+
+  QueryProfileStoreContext getProfileStoreContext();
+
+  WorkEventBus getWorkEventbus();
+
+  Set<Map.Entry<UserServer.BitToUserConnection, 
UserServer.BitToUserConnectionConfig>> getUserConnections();
+
+  boolean isUserAuthenticationEnabled();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index d77d0b8..ceda8b8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -17,474 +17,163 @@
  */
 package org.apache.drill.exec.ops;
 
+import java.io.IOException;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.CodeCompiler;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.fn.FunctionLookupContext;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.UserClientConnection;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
-import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.FragmentOptionManager;
-import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.PartitionExplorer;
-import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.testing.ExecutionControls;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.work.batch.IncomingBuffers;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 import io.netty.buffer.DrillBuf;
 
 /**
- * Contextual objects required for execution of a particular fragment.
- * This is the implementation; use <tt>FragmentContextInterface</tt>
- * in code to allow tests to use test-time implementations.
+ * Fragment context interface: separates implementation from definition.
+ * Allows unit testing by mocking or reimplementing services with
+ * test-time versions. The name is awkward, chosen to avoid renaming
+ * the implementation class which is used in many places in legacy code.
+ * New code should use this interface, and the names should eventually
+ * be swapped with {@link FragmentContextImpl} becoming
+ * <tt>FragmentContextImpl</tt> and this interface becoming
+ * {@link FragmentContextImpl}.
  */
 
-public class FragmentContext extends BaseFragmentContext implements 
AutoCloseable, UdfUtilities, FragmentContextInterface {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
-
-  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = 
Maps.newHashMap();
-  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
-
-  private final DrillbitContext context;
-  private final UserClientConnection connection; // is null if this context is 
for non-root fragment
-  private final QueryContext queryContext; // is null if this context is for 
non-root fragment
-  private final FragmentStats stats;
-  private final BufferAllocator allocator;
-  private final PlanFragment fragment;
-  private final ContextInformation contextInformation;
-  private IncomingBuffers buffers;
-  private final OptionManager fragmentOptions;
-  private final BufferManager bufferManager;
-  private ExecutorState executorState;
-  private final ExecutionControls executionControls;
-
-  private final SendingAccountor sendingAccountor = new SendingAccountor();
-  private final Consumer<RpcException> exceptionConsumer = new 
Consumer<RpcException>() {
-    @Override
-    public void accept(final RpcException e) {
-      fail(e);
-    }
-
-    @Override
-    public void interrupt(final InterruptedException e) {
-      if (shouldContinue()) {
-        logger.error("Received an unexpected interrupt while waiting for the 
data send to complete.", e);
-        fail(e);
-      }
-    }
-  };
-
-  private final RpcOutcomeListener<Ack> statusHandler = new 
StatusHandler(exceptionConsumer, sendingAccountor);
-  private final AccountingUserConnection accountingUserConnection;
-  /** Stores constants and their holders by type */
-  private final Map<String, Map<MinorType, ValueHolder>> 
constantValueHolderCache;
-
+public interface FragmentContext extends UdfUtilities, AutoCloseable {
   /**
-   * Create a FragmentContext instance for non-root fragment.
-   *
-   * @param dbContext DrillbitContext.
-   * @param fragment Fragment implementation.
-   * @param funcRegistry FunctionImplementationRegistry.
-   * @throws ExecutionSetupException
+   * Returns the UDF registry.
+   * @return the UDF registry
    */
-  public FragmentContext(final DrillbitContext dbContext, final PlanFragment 
fragment,
-      final FunctionImplementationRegistry funcRegistry) throws 
ExecutionSetupException {
-    this(dbContext, fragment, null, null, funcRegistry);
-  }
+  FunctionLookupContext getFunctionRegistry();
 
   /**
-   * Create a FragmentContext instance for root fragment.
-   *
-   * @param dbContext DrillbitContext.
-   * @param fragment Fragment implementation.
-   * @param queryContext QueryContext.
-   * @param connection UserClientConnection.
-   * @param funcRegistry FunctionImplementationRegistry.
-   * @throws ExecutionSetupException
+   * Returns a read-only version of the session options.
+   * @return the session options
    */
-  public FragmentContext(final DrillbitContext dbContext, final PlanFragment 
fragment, final QueryContext queryContext,
-      final UserClientConnection connection, final 
FunctionImplementationRegistry funcRegistry)
-    throws ExecutionSetupException {
-    super(funcRegistry);
-    this.context = dbContext;
-    this.queryContext = queryContext;
-    this.connection = connection;
-    this.accountingUserConnection = new AccountingUserConnection(connection, 
sendingAccountor, statusHandler);
-    this.fragment = fragment;
-    contextInformation = new ContextInformation(fragment.getCredentials(), 
fragment.getContext());
-
-    logger.debug("Getting initial memory allocation of {}", 
fragment.getMemInitial());
-    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
-
-    final OptionList list;
-    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
-      list = new OptionList();
-    } else {
-      try {
-        list = 
dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), 
OptionList.class);
-      } catch (final Exception e) {
-        throw new ExecutionSetupException("Failure while reading plan 
options.", e);
-      }
-    }
-    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), 
list);
-
-    executionControls = new ExecutionControls(fragmentOptions, 
dbContext.getEndpoint());
-
-    // Add the fragment context to the root allocator.
-    // The QueryManager will call the root allocator to recalculate all the 
memory limits for all the fragments
-    try {
-      allocator = context.getAllocator().newChildAllocator(
-          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
-          fragment.getMemInitial(),
-          fragment.getMemMax());
-      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
-    } catch (final OutOfMemoryException e) {
-      throw UserException.memoryError(e)
-        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + 
getHandle().getMinorFragmentId())
-        .build(logger);
-    } catch(final Throwable e) {
-      throw new ExecutionSetupException("Failure while getting memory 
allocator for fragment.", e);
-    }
-
-    stats = new FragmentStats(allocator, fragment.getAssignment());
-    bufferManager = new BufferManagerImpl(this.allocator);
-    constantValueHolderCache = Maps.newHashMap();
-  }
+  OptionManager getOptions();
+
+  boolean isImpersonationEnabled();
 
   /**
-   * TODO: Remove this constructor when removing the SimpleRootExec 
(DRILL-2097). This is kept only to avoid modifying
-   * the long list of test files.
+   * Generates code for a class given a {@link ClassGenerator},
+   * and returns a single instance of the generated class. (Note
+   * that the name is a misnomer, it would be better called
+   * <tt>getImplementationInstance</tt>.)
+   *
+   * @param cg the class generator
+   * @return an instance of the generated class
    */
-  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, 
UserClientConnection connection,
-      FunctionImplementationRegistry funcRegistry) throws 
ExecutionSetupException {
-    this(dbContext, fragment, null, connection, funcRegistry);
-  }
-
-  @Override
-  public OptionManager getOptions() {
-    return fragmentOptions;
-  }
-
-  public void setBuffers(final IncomingBuffers buffers) {
-    Preconditions.checkArgument(this.buffers == null, "Can only set buffers 
once.");
-    this.buffers = buffers;
-  }
-
-  public void setExecutorState(final ExecutorState executorState) {
-    Preconditions.checkArgument(this.executorState == null, "ExecutorState can 
only be set once.");
-    this.executorState = executorState;
-  }
-
-  public void fail(final Throwable cause) {
-    executorState.fail(cause);
-  }
+  <T> T getImplementationClass(final ClassGenerator<T> cg)
+      throws ClassTransformationException, IOException;
 
   /**
-   * Tells individual operations whether they should continue. In some cases, 
an external event (typically cancellation)
-   * will mean that the fragment should prematurely exit execution. Long 
running operations should check this every so
-   * often so that Drill is responsive to cancellation operations.
+   * Generates code for a class given a {@link CodeGenerator},
+   * and returns a single instance of the generated class. (Note
+   * that the name is a misnomer, it would be better called
+   * <tt>getImplementationInstance</tt>.)
    *
-   * @return false if the action should terminate immediately, true if 
everything is okay.
+   * @param cg the code generator
+   * @return an instance of the generated class
    */
-  @Override
-  public boolean shouldContinue() {
-    return executorState.shouldContinue();
-  }
-
-  @Override
-  public DrillbitContext getDrillbitContext() {
-    return context;
-  }
+  <T> T getImplementationClass(final CodeGenerator<T> cg)
+      throws ClassTransformationException, IOException;
 
   /**
-   * This method is only used to construt InfoSchemaReader, it is for the 
reader to get full schema, so here we
-   * are going to return a fully initialized schema tree.
-   * @return root schema's plus
+   * Generates code for a class given a {@link ClassGenerator}, and returns the
+   * specified number of instances of the generated class. (Note that the name
+   * is a misnomer, it would be better called
+   * <tt>getImplementationInstances</tt>.)
+   *
+   * @param cg the class generator
+   * @return list of instances of the generated class
    */
-  public SchemaPlus getFullRootSchema() {
-    if (queryContext == null) {
-      fail(new UnsupportedOperationException("Schema tree can only be created 
in root fragment. " +
-          "This is a non-root fragment."));
-      return null;
-    }
-
-    final boolean isImpersonationEnabled = isImpersonationEnabled();
-    // If impersonation is enabled, we want to view the schema as query user 
and suppress authorization errors. As for
-    // InfoSchema purpose we want to show tables the user has permissions to 
list or query. If  impersonation is
-    // disabled view the schema as Drillbit process user and throw 
authorization errors to client.
-    SchemaConfig schemaConfig = SchemaConfig
-        .newBuilder(
-            isImpersonationEnabled ? queryContext.getQueryUserName() : 
ImpersonationUtil.getProcessUserName(),
-            queryContext)
-        .setIgnoreAuthErrors(isImpersonationEnabled)
-        .build();
-
-    return queryContext.getFullRootSchema(schemaConfig);
-  }
+  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int 
instanceCount)
+      throws ClassTransformationException, IOException;
 
   /**
-   * Get this node's identity.
-   * @return A DrillbitEndpoint object.
+   * Generates code for a class given a {@link CodeGenerator}, and returns the
+   * specified number of instances of the generated class. (Note that the name
+   * is a misnomer, it would be better called
+   * <tt>getImplementationInstances</tt>.)
+   *
+   * @param cg the code generator
+   * @return list of instances of the generated class
    */
-  public DrillbitEndpoint getIdentity() {
-    return context.getEndpoint();
-  }
-
-  public FragmentStats getStats() {
-    return stats;
-  }
-
-  @Override
-  public ContextInformation getContextInformation() {
-    return contextInformation;
-  }
-
-  public DrillbitEndpoint getForemanEndpoint() {
-    return fragment.getForeman();
-  }
+  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int 
instanceCount)
+      throws ClassTransformationException, IOException;
 
   /**
-   * The FragmentHandle for this Fragment
-   * @return FragmentHandle
+   * Return the set of execution controls used to inject faults into running
+   * code for testing.
+   *
+   * @return the execution controls
    */
-  public FragmentHandle getHandle() {
-    return fragment.getHandle();
-  }
-
-  public String getFragIdString() {
-    final FragmentHandle handle = getHandle();
-    final String frag = handle != null ? handle.getMajorFragmentId() + ":" + 
handle.getMinorFragmentId() : "0:0";
-    return frag;
-  }
+  ExecutionControls getExecutionControls();
 
   /**
-   * Get this fragment's allocator.
-   * @return the allocator
+   * Returns the Drill configuration for this run. Note that the config is
+   * global and immutable.
+   *
+   * @return the Drill configuration
    */
-  @Deprecated
-  public BufferAllocator getAllocator() {
-    if (allocator == null) {
-      logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL");
-    }
-    return allocator;
-  }
-
-  public BufferAllocator getNewChildAllocator(final String operatorName,
-      final int operatorId,
-      final long initialReservation,
-      final long maximumReservation) throws OutOfMemoryException {
-    return allocator.newChildAllocator(
-        "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + 
operatorId + ":" + operatorName,
-        initialReservation,
-        maximumReservation
-        );
-  }
-
-  public boolean isOverMemoryLimit() {
-    return allocator.isOverLimit();
-  }
-
-  @Override
-  protected CodeCompiler getCompiler() {
-    return context.getCompiler();
-  }
+  DrillConfig getConfig();
 
-  public AccountingUserConnection getUserDataTunnel() {
-    Preconditions.checkState(connection != null, "Only Root fragment can get 
UserDataTunnel");
-    return accountingUserConnection;
-  }
+  CodeCompiler getCompiler();
 
-  public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
-    return context.getController().getTunnel(endpoint);
-  }
+  ExecutorService getScanDecodeExecutor();
 
-  public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
-    AccountingDataTunnel tunnel = tunnels.get(endpoint);
-    if (tunnel == null) {
-      tunnel = new 
AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), 
sendingAccountor, statusHandler);
-      tunnels.put(endpoint, tunnel);
-    }
-    return tunnel;
-  }
+  ExecutorService getScanExecutor();
 
-  public IncomingBuffers getBuffers() {
-    return buffers;
-  }
+  ExecutorService getExecutor();
 
-  @Override
-  public OperatorContext newOperatorContext(PhysicalOperator popConfig, 
OperatorStats stats)
-      throws OutOfMemoryException {
-    OperatorContextImpl context = new OperatorContextImpl(popConfig, this, 
stats);
-    contexts.add(context);
-    return context;
-  }
+  ExecutorState getExecutorState();
 
-  @Override
-  public OperatorContext newOperatorContext(PhysicalOperator popConfig)
-      throws OutOfMemoryException {
-    OperatorContextImpl context = new OperatorContextImpl(popConfig, this);
-    contexts.add(context);
-    return context;
-  }
+  BufferAllocator getNewChildAllocator(final String operatorName,
+                                       final int operatorId,
+                                       final long initialReservation,
+                                       final long maximumReservation);
 
-  @VisibleForTesting
-  @Deprecated
-  public Throwable getFailureCause() {
-    return executorState.getFailureCause();
-  }
+  ExecProtos.FragmentHandle getHandle();
 
-  @VisibleForTesting
-  @Deprecated
-  public boolean isFailed() {
-    return executorState.isFailed();
-  }
+  BufferAllocator getAllocator();
 
-  @Override
-  public DrillConfig getConfig() {
-    return context.getConfig();
-  }
+  OperatorContext newOperatorContext(PhysicalOperator popConfig);
 
-  public void setFragmentLimit(final long limit) {
-    allocator.setLimit(limit);
-  }
+  OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats 
stats);
 
-  @Override
-  public ExecutionControls getExecutionControls() {
-    return executionControls;
-  }
+  SchemaPlus getFullRootSchema();
 
-  @Override
-  public String getQueryUserName() {
-    return fragment.getCredentials().getUserName();
-  }
+  String getQueryUserName();
 
-  public boolean isImpersonationEnabled() {
-    // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to 
consider impersonation disabled if there is
-    // no config
-    if (getConfig() == null) {
-      return false;
-    }
+  String getFragIdString();
 
-    return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
-  }
+  DrillBuf replace(DrillBuf old, int newSize);
 
-  public boolean isUserAuthenticationEnabled() {
-    // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to 
consider impersonation disabled if there is
-    // no config
-    if (getConfig() == null) {
-      return false;
-    }
+  DrillBuf getManagedBuffer();
 
-    return getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED);
-  }
+  DrillBuf getManagedBuffer(int size);
 
   @Override
-  public void close() {
-    waitForSendComplete();
-
-    // close operator context
-    for (OperatorContextImpl opContext : contexts) {
-      suppressingClose(opContext);
-    }
-
-    suppressingClose(bufferManager);
-    suppressingClose(buffers);
-    suppressingClose(allocator);
-  }
+  void close();
 
-  private void suppressingClose(final AutoCloseable closeable) {
-    try {
-      if (closeable != null) {
-        closeable.close();
-      }
-    } catch (final Exception e) {
-      fail(e);
-    }
-  }
-
-  @Override
-  public PartitionExplorer getPartitionExplorer() {
-    throw new UnsupportedOperationException(String.format("The partition 
explorer interface can only be used " +
-        "in functions that can be evaluated at planning time. Make sure that 
the %s configuration " +
-        "option is set to true.", 
PlannerSettings.CONSTANT_FOLDING.getOptionName()));
-  }
-
-  @Override
-  public ValueHolder getConstantValueHolder(String value, MinorType type, 
Function<DrillBuf, ValueHolder> holderInitializer) {
-    if (!constantValueHolderCache.containsKey(value)) {
-      constantValueHolderCache.put(value, Maps.<MinorType, 
ValueHolder>newHashMap());
-    }
-
-    Map<MinorType, ValueHolder> holdersByType = 
constantValueHolderCache.get(value);
-    ValueHolder valueHolder = holdersByType.get(type);
-    if (valueHolder == null) {
-      valueHolder = holderInitializer.apply(getManagedBuffer());
-      holdersByType.put(type, valueHolder);
-    }
-    return valueHolder;
-  }
-
-  public Executor getExecutor(){
-    return context.getExecutor();
-  }
-
-  /**
-   * Wait for ack that all outgoing batches have been sent
-   */
-  public void waitForSendComplete() {
-    sendingAccountor.waitForSendComplete();
-  }
-
-  public WorkEventBus getWorkEventbus() {
-    return context.getWorkBus();
-  }
-
-  public boolean isBuffersDone() {
-    Preconditions.checkState(this.buffers != null, "Incoming Buffers is not 
set in this fragment context");
-    return buffers.isDone();
-  }
-
-  @Override
-  protected BufferManager getBufferManager() {
-    return bufferManager;
-  }
-
-  public interface ExecutorState {
+  interface ExecutorState {
     /**
-     * Whether execution should continue.
+     * Tells individual operations whether they should continue. In some 
cases, an external event (typically cancellation)
+     * will mean that the fragment should prematurely exit execution. Long 
running operations should check this every so
+     * often so that Drill is responsive to cancellation operations.
      *
-     * @return false if execution should stop.
+     * @return False if the action should terminate immediately, true if 
everything is okay.
      */
-    public boolean shouldContinue();
+    boolean shouldContinue();
 
     /**
      * Inform the executor if a exception occurs and fragment should be failed.
@@ -492,16 +181,14 @@ public class FragmentContext extends BaseFragmentContext 
implements AutoCloseabl
      * @param t
      *          The exception that occurred.
      */
-    public void fail(final Throwable t);
+    void fail(final Throwable t);
 
     @VisibleForTesting
     @Deprecated
-    public boolean isFailed();
+    boolean isFailed();
 
     @VisibleForTesting
     @Deprecated
-    public Throwable getFailureCause();
-
+    Throwable getFailureCause();
   }
-
 }

Reply via email to