DRILL-5730: Mock testing improvements and interface improvements closes #1045
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/186536d5 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/186536d5 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/186536d5 Branch: refs/heads/master Commit: 186536d544d02ffc01339a4645e2a533545a2f86 Parents: 9926eda Author: Timothy Farkas <timothyfar...@apache.org> Authored: Thu Jan 11 14:59:41 2018 -0800 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Fri Jan 26 13:42:27 2018 +0200 ---------------------------------------------------------------------- .../apache/drill/common/config/DrillConfig.java | 10 +- .../drill/common/config/NestedConfig.java | 2 - .../store/mapr/db/MapRDBScanBatchCreator.java | 11 +- .../mapr/db/json/MaprDBJsonRecordReader.java | 4 +- .../exec/store/hbase/HBaseRecordReader.java | 4 +- .../exec/store/hbase/HBaseScanBatchCreator.java | 10 +- .../codegen/templates/HiveRecordReaders.java | 2 +- .../hive/HiveDrillNativeScanBatchCreator.java | 7 +- .../exec/store/hive/HiveScanBatchCreator.java | 6 +- .../exec/store/hive/HiveStoragePlugin.java | 1 + .../drill/exec/store/hive/HiveUtilities.java | 6 +- .../apache/drill/exec/hive/TestHiveStorage.java | 3 - .../drill/exec/store/jdbc/JdbcBatchCreator.java | 7 +- .../drill/exec/store/jdbc/JdbcRecordReader.java | 9 +- .../exec/store/kafka/KafkaRecordReader.java | 13 +- .../exec/store/kafka/KafkaScanBatchCreator.java | 4 +- .../drill/exec/store/kudu/KuduRecordReader.java | 4 +- .../exec/store/kudu/KuduScanBatchCreator.java | 6 +- .../exec/store/kudu/KuduWriterBatchCreator.java | 4 +- .../exec/store/mongo/MongoRecordReader.java | 5 +- .../exec/store/mongo/MongoScanBatchCreator.java | 6 +- .../store/openTSDB/OpenTSDBBatchCreator.java | 4 +- .../exec/expr/fn/FunctionGenerationHelper.java | 2 +- .../drill/exec/ops/BaseFragmentContext.java | 6 +- .../drill/exec/ops/BaseOperatorContext.java | 22 +- .../drill/exec/ops/ExchangeFragmentContext.java | 34 ++ .../drill/exec/ops/ExecutorFragmentContext.java | 56 ++ .../apache/drill/exec/ops/FragmentContext.java | 511 ++++--------------- .../drill/exec/ops/FragmentContextImpl.java | 491 ++++++++++++++++++ .../apache/drill/exec/ops/OperatorContext.java | 2 +- .../drill/exec/ops/OperatorContextImpl.java | 12 +- .../apache/drill/exec/ops/OperatorStats.java | 3 - .../drill/exec/ops/RootFragmentContext.java | 24 + .../org/apache/drill/exec/ops/UdfUtilities.java | 2 +- .../drill/exec/physical/base/AbstractBase.java | 11 - .../drill/exec/physical/base/Exchange.java | 18 +- .../exec/physical/base/PhysicalOperator.java | 28 +- .../drill/exec/physical/impl/BaseRootExec.java | 14 +- .../drill/exec/physical/impl/BatchCreator.java | 6 +- .../drill/exec/physical/impl/ImplCreator.java | 19 +- .../physical/impl/MergingReceiverCreator.java | 4 +- .../drill/exec/physical/impl/RootCreator.java | 6 +- .../drill/exec/physical/impl/ScanBatch.java | 6 +- .../drill/exec/physical/impl/ScreenCreator.java | 14 +- .../exec/physical/impl/SingleSenderCreator.java | 7 +- .../impl/TopN/PriorityQueueTemplate.java | 1 - .../exec/physical/impl/TopN/TopNBatch.java | 12 +- .../impl/TopN/TopNSortBatchCreator.java | 10 +- .../exec/physical/impl/WriterRecordBatch.java | 4 +- .../physical/impl/aggregate/HashAggBatch.java | 7 +- .../impl/aggregate/HashAggBatchCreator.java | 9 +- .../impl/aggregate/HashAggTemplate.java | 4 +- .../physical/impl/aggregate/HashAggregator.java | 27 +- .../impl/aggregate/SpilledRecordbatch.java | 8 +- .../impl/aggregate/StreamingAggBatch.java | 6 +- .../aggregate/StreamingAggBatchCreator.java | 9 +- .../impl/aggregate/StreamingAggregator.java | 1 - .../broadcastsender/BroadcastSenderCreator.java | 4 +- .../BroadcastSenderRootExec.java | 4 +- .../physical/impl/common/ChainedHashTable.java | 3 +- .../exec/physical/impl/common/HashTable.java | 48 +- .../physical/impl/common/HashTableTemplate.java | 7 +- .../impl/filter/FilterBatchCreator.java | 10 +- .../physical/impl/filter/FilterRecordBatch.java | 17 +- .../physical/impl/filter/FilterSignature.java | 4 +- .../physical/impl/filter/FilterTemplate2.java | 4 +- .../physical/impl/filter/FilterTemplate4.java | 2 - .../exec/physical/impl/filter/Filterer.java | 10 +- .../impl/flatten/FlattenBatchCreator.java | 9 +- .../impl/flatten/FlattenRecordBatch.java | 2 - .../exec/physical/impl/flatten/Flattener.java | 20 +- .../exec/physical/impl/join/HashJoinBatch.java | 7 +- .../impl/join/HashJoinBatchCreator.java | 4 +- .../exec/physical/impl/join/HashJoinProbe.java | 18 +- .../exec/physical/impl/join/JoinWorker.java | 11 +- .../exec/physical/impl/join/MergeJoinBatch.java | 7 +- .../physical/impl/join/MergeJoinCreator.java | 7 +- .../impl/join/NestedLoopJoinBatchCreator.java | 4 +- .../physical/impl/limit/LimitBatchCreator.java | 4 +- .../impl/mergereceiver/MergingRecordBatch.java | 42 +- .../OrderedPartitionRecordBatch.java | 16 +- .../OrderedPartitionSenderCreator.java | 4 +- .../partitionsender/PartitionSenderCreator.java | 4 +- .../PartitionSenderRootExec.java | 17 +- .../impl/partitionsender/Partitioner.java | 31 +- .../partitionsender/PartitionerDecorator.java | 4 +- .../partitionsender/PartitionerTemplate.java | 3 +- .../impl/producer/ProducerConsumerBatch.java | 4 +- .../producer/ProducerConsumerBatchCreator.java | 4 +- .../impl/project/ComplexToJsonBatchCreator.java | 7 +- .../impl/project/ProjectBatchCreator.java | 7 +- .../impl/project/ProjectRecordBatch.java | 2 +- .../exec/physical/impl/sort/SortBatch.java | 4 +- .../physical/impl/sort/SortBatchCreator.java | 8 +- .../impl/sort/SortRecordBatchBuilder.java | 5 - .../impl/svremover/RemovingRecordBatch.java | 6 +- .../impl/svremover/SVRemoverCreator.java | 10 +- .../physical/impl/trace/TraceBatchCreator.java | 8 +- .../impl/union/UnionAllBatchCreator.java | 10 +- .../impl/union/UnionAllRecordBatch.java | 4 +- .../UnorderedReceiverBatch.java | 20 +- .../UnorderedReceiverCreator.java | 6 +- .../impl/validate/IteratorValidatorCreator.java | 6 +- .../impl/values/ValuesBatchCreator.java | 6 +- .../impl/window/WindowFrameBatchCreator.java | 5 +- .../impl/window/WindowFrameRecordBatch.java | 2 +- .../physical/impl/xsort/ExternalSortBatch.java | 19 +- .../impl/xsort/ExternalSortBatchCreator.java | 6 +- .../exec/physical/impl/xsort/MSortTemplate.java | 2 +- .../physical/impl/xsort/SingleBatchSorter.java | 4 +- .../impl/xsort/SingleBatchSorterTemplate.java | 6 +- .../impl/xsort/managed/ExternalSortBatch.java | 8 +- .../impl/xsort/managed/MSortTemplate.java | 10 +- .../physical/impl/xsort/managed/MSorter.java | 4 +- .../drill/exec/record/AbstractRecordBatch.java | 6 +- .../exec/record/AbstractSingleRecordBatch.java | 2 +- .../apache/drill/exec/rpc/data/DataTunnel.java | 43 -- .../drill/exec/server/BootStrapContext.java | 8 + .../server/options/FragmentOptionManager.java | 3 +- .../apache/drill/exec/store/ColumnExplorer.java | 10 - .../exec/store/dfs/easy/EasyFormatPlugin.java | 4 +- .../store/dfs/easy/EasyReaderBatchCreator.java | 8 +- .../store/dfs/easy/EasyWriterBatchCreator.java | 6 +- .../exec/store/direct/DirectBatchCreator.java | 6 +- .../exec/store/easy/json/JSONRecordReader.java | 2 +- .../exec/store/easy/text/TextFormatPlugin.java | 2 +- .../compliant/CompliantTextRecordReader.java | 3 +- .../store/ischema/InfoSchemaBatchCreator.java | 8 +- .../store/mock/ExtendedMockRecordReader.java | 3 +- .../drill/exec/store/mock/MockRecordReader.java | 6 +- .../exec/store/mock/MockScanBatchCreator.java | 8 +- .../store/parquet/ParquetRGFilterEvaluator.java | 4 +- .../store/parquet/ParquetScanBatchCreator.java | 9 +- .../parquet/ParquetWriterBatchCreator.java | 4 +- .../ParquetToDrillTypeConverter.java | 5 +- .../stat/ParquetFooterStatCollector.java | 3 - .../store/sys/BitToUserConnectionIterator.java | 13 +- .../drill/exec/store/sys/DrillbitIterator.java | 8 +- .../drill/exec/store/sys/MemoryIterator.java | 10 +- .../exec/store/sys/ProfileInfoIterator.java | 4 +- .../drill/exec/store/sys/ProfileIterator.java | 10 +- .../exec/store/sys/ProfileJsonIterator.java | 4 +- .../drill/exec/store/sys/SystemTable.java | 30 +- .../exec/store/sys/SystemTableBatchCreator.java | 6 +- .../drill/exec/store/sys/ThreadsIterator.java | 8 +- .../drill/exec/testing/ControlsInjector.java | 15 +- .../drill/exec/testing/ExecutionControls.java | 6 + .../exec/testing/ExecutionControlsInjector.java | 14 +- .../exec/testing/NoOpControlsInjector.java | 6 +- .../exec/work/batch/AbstractDataCollector.java | 7 +- .../exec/work/batch/BaseRawBatchBuffer.java | 28 +- .../exec/work/batch/ControlMessageHandler.java | 4 +- .../exec/work/batch/SpoolingRawBatchBuffer.java | 6 +- .../work/batch/UnlimitedRawBatchBuffer.java | 4 +- .../exec/work/foreman/FragmentsRunner.java | 6 +- .../work/fragment/AbstractFragmentManager.java | 5 +- .../exec/work/fragment/FragmentExecutor.java | 25 +- .../work/fragment/FragmentStatusReporter.java | 16 +- .../java/org/apache/drill/PlanningBase.java | 87 +--- .../org/apache/drill/TestDynamicUDFSupport.java | 27 +- .../java/org/apache/drill/exec/ExecTest.java | 36 +- .../java/org/apache/drill/exec/RunRootExec.java | 4 +- .../apache/drill/exec/client/DumpCatTest.java | 19 +- .../apache/drill/exec/expr/ExpressionTest.java | 69 ++- .../exec/expr/fn/impl/TestStringFunctions.java | 3 - .../drill/exec/fn/impl/TestMathFunctions.java | 18 +- .../drill/exec/fn/impl/TestMultiInputAdd.java | 60 +-- .../exec/fn/impl/TestNewMathFunctions.java | 42 +- .../exec/fn/impl/TestRepeatedFunction.java | 21 +- .../fn/interp/ExpressionInterpreterTest.java | 8 +- .../impersonation/TestInboundImpersonation.java | 3 +- .../drill/exec/memory/TestAllocators.java | 8 +- .../exec/physical/impl/SimpleRootExec.java | 14 +- .../exec/physical/impl/TestCastFunctions.java | 120 +++-- .../physical/impl/TestComparisonFunctions.java | 115 ++--- .../physical/impl/TestConvertFunctions.java | 18 +- .../impl/TestImplicitCastFunctions.java | 38 +- .../exec/physical/impl/TestOptiqPlans.java | 322 ------------ .../physical/impl/TestReverseImplicitCast.java | 7 +- .../exec/physical/impl/TestSimpleFunctions.java | 92 ++-- .../exec/physical/impl/TestStringFunctions.java | 111 ++-- .../drill/exec/physical/impl/agg/TestAgg.java | 41 +- .../physical/impl/filter/TestSimpleFilter.java | 32 +- .../exec/physical/impl/join/TestHashJoin.java | 47 +- .../exec/physical/impl/join/TestMergeJoin.java | 75 ++- .../physical/impl/limit/TestSimpleLimit.java | 38 +- .../partitionsender/TestPartitionSender.java | 8 +- .../impl/project/TestSimpleProjection.java | 21 +- .../exec/physical/impl/sort/TestSimpleSort.java | 32 +- .../impl/trace/TestTraceMultiRecordBatch.java | 18 +- .../impl/trace/TestTraceOutputDump.java | 18 +- .../physical/impl/union/TestSimpleUnion.java | 18 +- .../impl/xsort/managed/TestSortImpl.java | 4 +- .../physical/unit/MiniPlanUnitTestBase.java | 21 +- .../physical/unit/PhysicalOpUnitTestBase.java | 285 +++++++---- .../TestHardAffinityFragmentParallelizer.java | 16 +- .../record/ExpressionTreeMaterializerTest.java | 72 +-- .../drill/exec/record/TestRecordIterator.java | 22 +- .../drill/exec/record/TestVectorContainer.java | 128 +++++ .../drill/exec/rpc/data/TestBitBitKerberos.java | 166 +++--- .../apache/drill/exec/rpc/data/TestBitRpc.java | 129 +++-- .../security/TestCustomUserAuthenticator.java | 3 +- .../rpc/user/security/TestUserBitKerberos.java | 3 +- .../security/TestUserBitKerberosEncryption.java | 36 +- .../exec/rpc/user/security/TestUserBitSSL.java | 11 +- .../rpc/user/security/TestUserBitSSLServer.java | 22 +- .../security/TestUserBitSaslCompatibility.java | 34 +- .../exec/server/TestOptionsAuthEnabled.java | 3 +- .../server/rest/TestMainLoginPageModel.java | 8 +- .../spnego/TestDrillSpnegoAuthenticator.java | 3 +- .../rest/spnego/TestSpnegoAuthentication.java | 18 +- .../server/rest/spnego/TestSpnegoConfig.java | 10 +- .../exec/store/TestAffinityCalculator.java | 162 +----- .../store/parquet/ParquetRecordReaderTest.java | 21 +- .../parquet/TestParquetFilterPushDown.java | 6 +- .../store/parquet/TestParquetMetadataCache.java | 37 +- .../testing/TestCountDownLatchInjection.java | 3 +- .../drill/exec/testing/TestPauseInjection.java | 3 +- .../drill/exec/util/TestQueryMemoryAlloc.java | 51 +- .../fragment/FragmentStatusReporterTest.java | 26 +- .../org/apache/drill/test/ClusterFixture.java | 1 - .../org/apache/drill/test/ConfigBuilder.java | 49 +- .../org/apache/drill/test/OperatorFixture.java | 281 +++++++--- .../common/expression/LogicalExpression.java | 27 +- 224 files changed, 2732 insertions(+), 2941 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/common/src/main/java/org/apache/drill/common/config/DrillConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java index 0d3ce9a..73c899d 100644 --- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java +++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java @@ -45,13 +45,11 @@ public class DrillConfig extends NestedConfig { private final ImmutableList<String> startupArguments; - public static final boolean ON_OSX = System.getProperty("os.name").contains("OS X"); - @SuppressWarnings("restriction") private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory(); @VisibleForTesting - public DrillConfig(Config config, boolean enableServerConfigs) { + public DrillConfig(Config config) { super(config); logger.debug("Setting up DrillConfig object."); logger.trace("Given Config object is:\n{}", @@ -138,7 +136,7 @@ public class DrillConfig extends NestedConfig { * @param overrideFileResourcePathname * the classpath resource pathname of the file to use for * configuration override purposes; {@code null} specifies to use the - * default pathname ({@link CommonConstants.CONFIG_OVERRIDE}) (does + * default pathname ({@link CommonConstants#CONFIG_OVERRIDE_RESOURCE_PATHNAME}) (does * <strong>not</strong> specify to suppress trying to load an * overrides file) * @return A merged Config object. @@ -169,7 +167,7 @@ public class DrillConfig extends NestedConfig { * @return {@link DrillConfig} instance */ public static DrillConfig create(Config config) { - return new DrillConfig(config.resolve(), true); + return new DrillConfig(config.resolve()); } /** @@ -252,7 +250,7 @@ public class DrillConfig extends NestedConfig { logger.info("Configuration and plugin file(s) identified in {}ms.\n{}", watch.elapsed(TimeUnit.MILLISECONDS), logString); - return new DrillConfig(effectiveConfig.resolve(), enableServerConfigs); + return new DrillConfig(effectiveConfig.resolve()); } public <T> Class<T> getClassAt(String location, Class<T> clazz) throws DrillConfigurationException { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/common/src/main/java/org/apache/drill/common/config/NestedConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/config/NestedConfig.java b/common/src/main/java/org/apache/drill/common/config/NestedConfig.java index 60fe013..a1533a5 100644 --- a/common/src/main/java/org/apache/drill/common/config/NestedConfig.java +++ b/common/src/main/java/org/apache/drill/common/config/NestedConfig.java @@ -30,8 +30,6 @@ import com.typesafe.config.ConfigResolveOptions; import com.typesafe.config.ConfigValue; abstract class NestedConfig implements Config { - // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedConfig.class); - private final Config c; NestedConfig(Config c) { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java index e770c96..d9c8ce7 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.store.mapr.db; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; @@ -33,11 +33,9 @@ import org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{ - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class); - +public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan> { @Override - public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { + public ScanBatch getBatch(ExecutorFragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); List<RecordReader> readers = Lists.newArrayList(); for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){ @@ -46,8 +44,7 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{ readers.add(new HBaseRecordReader( subScan.getFormatPlugin().getConnection(), getHBaseSubScanSpec(scanSpec), - subScan.getColumns(), - context)); + subScan.getColumns())); } else { readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context)); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java index 1327541..9f93e18 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -72,7 +72,6 @@ import io.netty.buffer.DrillBuf; public class MaprDBJsonRecordReader extends AbstractRecordReader { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaprDBJsonRecordReader.class); - public static final SchemaPath ID_PATH = SchemaPath.getSimplePath(ID_KEY); private final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24; private Table table; @@ -117,7 +116,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { disableCountOptimization = formatPluginConfig.disableCountOptimization(); setColumns(projectedColumns); - unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); + unionEnabled = context.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY); readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble(); allTextMode = formatPluginConfig.isAllTextMode(); ignoreSchemaChange = formatPluginConfig.isIgnoreSchemaChange(); @@ -518,5 +517,4 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { table.close(); } } - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 631c44d..cb555e5 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -33,7 +33,6 @@ import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.PathSegment.NameSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.impl.OutputMutator; @@ -84,8 +83,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas private final Connection connection; - public HBaseRecordReader(Connection connection, HBaseSubScan.HBaseSubScanSpec subScanSpec, - List<SchemaPath> projectedColumns, FragmentContext context) { + public HBaseRecordReader(Connection connection, HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns) { this.connection = connection; hbaseTableName = TableName.valueOf( Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName()); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java index 8e815b9..ff9a4e4 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -31,11 +31,9 @@ import org.apache.drill.exec.store.RecordReader; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseScanBatchCreator.class); - +public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan> { @Override - public ScanBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<RecordBatch> children) + public ScanBatch getBatch(ExecutorFragmentContext context, HBaseSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); List<RecordReader> readers = Lists.newArrayList(); @@ -45,7 +43,7 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{ if ((columns = subScan.getColumns())==null) { columns = GroupScan.ALL_COLUMNS; } - readers.add(new HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, columns, context)); + readers.add(new HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, columns)); } catch (Exception e1) { throw new ExecutionSetupException(e1); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java index b4b2039..4a75ed3 100644 --- a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java +++ b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java @@ -40,9 +40,9 @@ import java.util.Properties; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.store.hive.HivePartition; import org.apache.drill.exec.store.hive.HiveTableWithColumnCache; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.io.Writable; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java index 60a01a5..243d781 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java @@ -28,7 +28,7 @@ import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -57,9 +57,10 @@ import com.google.common.collect.Maps; @SuppressWarnings("unused") public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNativeParquetSubScan> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDrillNativeScanBatchCreator.class); @Override - public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children) + public ScanBatch getBatch(ExecutorFragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children) throws ExecutionSetupException { final HiveTableWithColumnCache table = config.getTable(); final List<List<InputSplit>> splits = config.getInputSplits(); @@ -176,7 +177,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName()))); } - return new ScanBatch(config, context, oContext, readers, implicitColumns); + return new ScanBatch(context, oContext, readers, implicitColumns); } /** http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java index 3df8fd9..9f1e29e 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.store.hive; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; @@ -29,12 +29,12 @@ import org.apache.drill.exec.store.hive.readers.initilializers.ReadersInitialize @SuppressWarnings("unused") public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScanBatchCreator.class); @Override - public ScanBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children) + public ScanBatch getBatch(ExecutorFragmentContext context, HiveSubScan config, List<RecordBatch> children) throws ExecutionSetupException { AbstractReadersInitializer readersInitializer = ReadersInitializer.getInitializer(context, config); return new ScanBatch(config, context, readersInitializer.init()); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index 8a842b2..925ff49 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -32,6 +32,7 @@ import org.apache.drill.common.JSONOptions; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java index 4164649..b101f49 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java @@ -32,7 +32,7 @@ import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; import org.apache.drill.exec.expr.holders.Decimal9Holder; import org.apache.drill.exec.planner.physical.PlannerSettings; -import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.OptionSet; import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.NullableBigIntVector; import org.apache.drill.exec.vector.NullableBitVector; @@ -288,7 +288,7 @@ public class HiveUtilities { } } - public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionManager options) { + public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionSet options) { switch (typeInfo.getCategory()) { case PRIMITIVE: { PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; @@ -325,7 +325,7 @@ public class HiveUtilities { } public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo, - OptionManager options) { + OptionSet options) { switch(primitiveTypeInfo.getPrimitiveCategory()) { case BINARY: return TypeProtos.MinorType.VARBINARY; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java index c07c9d8..64f3b5b 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.hive; -import mockit.integration.junit4.JMockit; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import org.apache.drill.categories.HiveStorageTest; @@ -33,7 +32,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; import java.math.BigDecimal; import java.sql.Date; @@ -48,7 +46,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -@RunWith(JMockit.class) @Category({SlowTest.class, HiveStorageTest.class}) public class TestHiveStorage extends HiveTestBase { @BeforeClass http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java index 1782e1a..d37bfad 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java @@ -21,10 +21,9 @@ import java.util.Collections; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; -//import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.RecordReader; @@ -32,11 +31,11 @@ import com.google.common.base.Preconditions; public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> { @Override - public ScanBatch getBatch(FragmentContext context, JdbcSubScan config, + public ScanBatch getBatch(ExecutorFragmentContext context, JdbcSubScan config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); JdbcStoragePlugin plugin = config.getPlugin(); - RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), config.getSql(), plugin.getName()); + RecordReader reader = new JdbcRecordReader(plugin.getSource(), config.getSql(), plugin.getName()); return new ScanBatch(config, context, Collections.singletonList(reader)); } } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java index 531f343..364b8a8 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java @@ -40,7 +40,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; @@ -70,17 +69,13 @@ class JdbcRecordReader extends AbstractRecordReader { private final DataSource source; private ResultSet resultSet; private final String storagePluginName; - private FragmentContext fragmentContext; private Connection connection; private Statement statement; private final String sql; private ImmutableList<ValueVector> vectors; private ImmutableList<Copier<?>> copiers; - private OperatorContext operatorContext; - - public JdbcRecordReader(FragmentContext fragmentContext, DataSource source, String sql, String storagePluginName) { - this.fragmentContext = fragmentContext; + public JdbcRecordReader(DataSource source, String sql, String storagePluginName) { this.source = source; this.sql = sql; this.storagePluginName = storagePluginName; @@ -170,8 +165,6 @@ class JdbcRecordReader extends AbstractRecordReader { @Override public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { try { - - this.operatorContext = operatorContext; connection = source.getConnection(); statement = connection.createStatement(); resultSet = statement.executeQuery(sql); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java index c08c86e..801618c 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java @@ -66,13 +66,12 @@ public class KafkaRecordReader extends AbstractRecordReader { public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context, KafkaStoragePlugin plugin) { setColumns(projectedColumns); - this.enableAllTextMode = context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val; - this.readNumbersAsDouble = context.getOptions() - .getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val; - OptionManager options = context.getOptions(); - this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE); - this.kafkaMsgReader = options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val; - this.kafkaPollTimeOut = options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val; + final OptionManager optionManager = context.getOptions(); + this.enableAllTextMode = optionManager.getBoolean(ExecConstants.KAFKA_ALL_TEXT_MODE); + this.readNumbersAsDouble = optionManager.getBoolean(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE); + this.unionEnabled = optionManager.getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY); + this.kafkaMsgReader = optionManager.getString(ExecConstants.KAFKA_RECORD_READER); + this.kafkaPollTimeOut = optionManager.getLong(ExecConstants.KAFKA_POLL_TIMEOUT); this.plugin = plugin; this.subScanSpec = subScanSpec; } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java index 026e341..55fc28f 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -38,7 +38,7 @@ public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> { static final Logger logger = LoggerFactory.getLogger(KafkaScanBatchCreator.class); @Override - public CloseableRecordBatch getBatch(FragmentContext context, KafkaSubScan subScan, List<RecordBatch> children) + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, KafkaSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); List<SchemaPath> columns = subScan.getColumns() != null ? subScan.getColumns() : GroupScan.ALL_COLUMNS; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java index b9a7bf9..e180dd9 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java @@ -29,7 +29,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; @@ -87,8 +86,7 @@ public class KuduRecordReader extends AbstractRecordReader { private ImmutableList<ProjectedColumnInfo> projectedCols; - public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec subScanSpec, - List<SchemaPath> projectedColumns, FragmentContext context) { + public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec subScanSpec, List<SchemaPath> projectedColumns) { setColumns(projectedColumns); this.client = client; scanSpec = subScanSpec; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java index fc1db5d..5e9d86c 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -35,7 +35,7 @@ public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduScanBatchCreator.class); @Override - public ScanBatch getBatch(FragmentContext context, KuduSubScan subScan, List<RecordBatch> children) + public ScanBatch getBatch(ExecutorFragmentContext context, KuduSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); List<RecordReader> readers = Lists.newArrayList(); @@ -46,7 +46,7 @@ public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{ if ((columns = subScan.getColumns())==null) { columns = GroupScan.ALL_COLUMNS; } - readers.add(new KuduRecordReader(subScan.getStorageEngine().getClient(), scanSpec, columns, context)); + readers.add(new KuduRecordReader(subScan.getStorageEngine().getClient(), scanSpec, columns)); } catch (Exception e1) { throw new ExecutionSetupException(e1); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java index c200c17..962a1d0 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.store.kudu; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.WriterRecordBatch; import org.apache.drill.exec.record.CloseableRecordBatch; @@ -30,7 +30,7 @@ public class KuduWriterBatchCreator implements BatchCreator<KuduWriter> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduWriterBatchCreator.class); @Override - public CloseableRecordBatch getBatch(FragmentContext context, KuduWriter config, List<RecordBatch> children) + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, KuduWriter config, List<RecordBatch> children) throws ExecutionSetupException { assert children != null && children.size() == 1; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java index da516dd..19f9e07 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java @@ -25,7 +25,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; -import com.google.common.collect.ImmutableList; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -68,7 +67,6 @@ public class MongoRecordReader extends AbstractRecordReader { private final Document fields; private final FragmentContext fragmentContext; - private OperatorContext operatorContext; private final MongoStoragePlugin plugin; @@ -146,13 +144,12 @@ public class MongoRecordReader extends AbstractRecordReader { } MongoClient client = plugin.getClient(addresses); MongoDatabase db = client.getDatabase(subScanSpec.getDbName()); - this.unionEnabled = fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); + this.unionEnabled = fragmentContext.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY); collection = db.getCollection(subScanSpec.getCollectionName(), BsonDocument.class); } @Override public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { - this.operatorContext = context; this.writer = new VectorContainerWriter(output, unionEnabled); // Default is BsonReader and all text mode will not be honored in // BsonRecordReader http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java index 9935184..42717fc 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -32,15 +32,13 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.mongodb.MongoClientOptions; -import com.mongodb.MongoCredential; public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> { static final Logger logger = LoggerFactory .getLogger(MongoScanBatchCreator.class); @Override - public ScanBatch getBatch(FragmentContext context, MongoSubScan subScan, + public ScanBatch getBatch(ExecutorFragmentContext context, MongoSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); List<RecordReader> readers = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java index 935aaa5..fce2d7a 100644 --- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.store.openTSDB; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -33,7 +33,7 @@ import java.util.List; public class OpenTSDBBatchCreator implements BatchCreator<OpenTSDBSubScan> { @Override - public CloseableRecordBatch getBatch(FragmentContext context, OpenTSDBSubScan subScan, + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, OpenTSDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { List<RecordReader> readers = Lists.newArrayList(); List<SchemaPath> columns; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java index 6c186db..435b451 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java @@ -98,7 +98,7 @@ public class FunctionGenerationHelper { public static LogicalExpression getOrderingComparatorNullsHigh( HoldingContainer left, HoldingContainer right, - FunctionImplementationRegistry registry) { + FunctionLookupContext registry) { return getOrderingComparator(true, left, right, registry); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java index a39213c..f81d4c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.ops; import java.io.IOException; import java.util.List; -import org.apache.drill.exec.compile.CodeCompiler; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; @@ -32,8 +31,7 @@ import io.netty.buffer.DrillBuf; * Common implementation for both the test and production versions * of the fragment context. */ - -public abstract class BaseFragmentContext implements FragmentContextInterface { +public abstract class BaseFragmentContext implements FragmentContext { private final FunctionImplementationRegistry funcRegistry; @@ -46,8 +44,6 @@ public abstract class BaseFragmentContext implements FragmentContextInterface { return funcRegistry; } - protected abstract CodeCompiler getCompiler(); - @Override public <T> T getImplementationClass(final ClassGenerator<T> cg) throws ClassTransformationException, IOException { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java index 7c87570..211bd65 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java @@ -39,18 +39,17 @@ import io.netty.buffer.DrillBuf; * version of the operator context and the full production-time context * that includes network services. */ - public abstract class BaseOperatorContext implements OperatorContext { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class); - protected final FragmentContextInterface context; + protected final FragmentContext context; protected final BufferAllocator allocator; protected final PhysicalOperator popConfig; protected final BufferManager manager; private DrillFileSystem fs; private ControlsInjector injector; - public BaseOperatorContext(FragmentContextInterface context, BufferAllocator allocator, + public BaseOperatorContext(FragmentContext context, BufferAllocator allocator, PhysicalOperator popConfig) { this.context = context; this.allocator = allocator; @@ -59,7 +58,7 @@ public abstract class BaseOperatorContext implements OperatorContext { } @Override - public FragmentContextInterface getFragmentContext() { + public FragmentContext getFragmentContext() { return context; } @@ -104,17 +103,17 @@ public abstract class BaseOperatorContext implements OperatorContext { // Allow an operator to use the thread pool @Override public ExecutorService getExecutor() { - return context.getDrillbitContext().getExecutor(); + return context.getExecutor(); } @Override public ExecutorService getScanExecutor() { - return context.getDrillbitContext().getScanExecutor(); + return context.getScanExecutor(); } @Override public ExecutorService getScanDecodeExecutor() { - return context.getDrillbitContext().getScanDecodeExecutor(); + return context.getScanDecodeExecutor(); } @Override @@ -165,12 +164,9 @@ public abstract class BaseOperatorContext implements OperatorContext { fs = null; } } catch (IOException e) { - if (ex == null) { - ex = UserException - .resourceError(e) - .addContext("Failed to close the Drill file system for " + getName()) - .build(logger); - } + throw UserException.resourceError(e) + .addContext("Failed to close the Drill file system for " + getName()) + .build(logger); } if (ex != null) { throw ex; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExchangeFragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExchangeFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExchangeFragmentContext.java new file mode 100644 index 0000000..2fb0745 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExchangeFragmentContext.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.work.batch.IncomingBuffers; + +public interface ExchangeFragmentContext extends FragmentContext { + void waitForSendComplete(); + + AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint); + + AccountingUserConnection getUserDataTunnel(); + + Controller getController(); + + IncomingBuffers getBuffers(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExecutorFragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExecutorFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExecutorFragmentContext.java new file mode 100644 index 0000000..82bb886 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ExecutorFragmentContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.rpc.control.WorkEventBus; +import org.apache.drill.exec.rpc.user.UserServer; +import org.apache.drill.exec.server.QueryProfileStoreContext; +import org.apache.drill.exec.work.batch.IncomingBuffers; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +public interface ExecutorFragmentContext extends RootFragmentContext { + + PhysicalPlanReader getPlanReader(); + + ClusterCoordinator getClusterCoordinator(); + + CoordinationProtos.DrillbitEndpoint getForemanEndpoint(); + + CoordinationProtos.DrillbitEndpoint getEndpoint(); + + Collection<CoordinationProtos.DrillbitEndpoint> getBits(); + + OperatorCreatorRegistry getOperatorCreatorRegistry(); + + void setBuffers(final IncomingBuffers buffers); + + QueryProfileStoreContext getProfileStoreContext(); + + WorkEventBus getWorkEventbus(); + + Set<Map.Entry<UserServer.BitToUserConnection, UserServer.BitToUserConnectionConfig>> getUserConnections(); + + boolean isUserAuthenticationEnabled(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index d77d0b8..ceda8b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -17,474 +17,163 @@ */ package org.apache.drill.exec.ops; +import java.io.IOException; import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import com.google.common.annotations.VisibleForTesting; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.CodeCompiler; -import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; -import org.apache.drill.exec.expr.holders.ValueHolder; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.fn.FunctionLookupContext; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.planner.physical.PlannerSettings; -import org.apache.drill.exec.proto.BitControl.PlanFragment; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.UserClientConnection; -import org.apache.drill.exec.rpc.control.ControlTunnel; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.options.FragmentOptionManager; -import org.apache.drill.exec.server.options.OptionList; +import org.apache.drill.exec.proto.ExecProtos; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.store.PartitionExplorer; -import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.testing.ExecutionControls; -import org.apache.drill.exec.util.ImpersonationUtil; -import org.apache.drill.exec.work.batch.IncomingBuffers; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import io.netty.buffer.DrillBuf; /** - * Contextual objects required for execution of a particular fragment. - * This is the implementation; use <tt>FragmentContextInterface</tt> - * in code to allow tests to use test-time implementations. + * Fragment context interface: separates implementation from definition. + * Allows unit testing by mocking or reimplementing services with + * test-time versions. The name is awkward, chosen to avoid renaming + * the implementation class which is used in many places in legacy code. + * New code should use this interface, and the names should eventually + * be swapped with {@link FragmentContextImpl} becoming + * <tt>FragmentContextImpl</tt> and this interface becoming + * {@link FragmentContextImpl}. */ -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); - - private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap(); - private final List<OperatorContextImpl> contexts = Lists.newLinkedList(); - - private final DrillbitContext context; - private final UserClientConnection connection; // is null if this context is for non-root fragment - private final QueryContext queryContext; // is null if this context is for non-root fragment - private final FragmentStats stats; - private final BufferAllocator allocator; - private final PlanFragment fragment; - private final ContextInformation contextInformation; - private IncomingBuffers buffers; - private final OptionManager fragmentOptions; - private final BufferManager bufferManager; - private ExecutorState executorState; - private final ExecutionControls executionControls; - - private final SendingAccountor sendingAccountor = new SendingAccountor(); - private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() { - @Override - public void accept(final RpcException e) { - fail(e); - } - - @Override - public void interrupt(final InterruptedException e) { - if (shouldContinue()) { - logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e); - fail(e); - } - } - }; - - private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); - private final AccountingUserConnection accountingUserConnection; - /** Stores constants and their holders by type */ - private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache; - +public interface FragmentContext extends UdfUtilities, AutoCloseable { /** - * Create a FragmentContext instance for non-root fragment. - * - * @param dbContext DrillbitContext. - * @param fragment Fragment implementation. - * @param funcRegistry FunctionImplementationRegistry. - * @throws ExecutionSetupException + * Returns the UDF registry. + * @return the UDF registry */ - public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, - final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { - this(dbContext, fragment, null, null, funcRegistry); - } + FunctionLookupContext getFunctionRegistry(); /** - * Create a FragmentContext instance for root fragment. - * - * @param dbContext DrillbitContext. - * @param fragment Fragment implementation. - * @param queryContext QueryContext. - * @param connection UserClientConnection. - * @param funcRegistry FunctionImplementationRegistry. - * @throws ExecutionSetupException + * Returns a read-only version of the session options. + * @return the session options */ - public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext, - final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry) - throws ExecutionSetupException { - super(funcRegistry); - this.context = dbContext; - this.queryContext = queryContext; - this.connection = connection; - this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler); - this.fragment = fragment; - contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext()); - - logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); - logger.debug("Fragment max allocation: {}", fragment.getMemMax()); - - final OptionList list; - if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) { - list = new OptionList(); - } else { - try { - list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class); - } catch (final Exception e) { - throw new ExecutionSetupException("Failure while reading plan options.", e); - } - } - fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list); - - executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint()); - - // Add the fragment context to the root allocator. - // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments - try { - allocator = context.getAllocator().newChildAllocator( - "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()), - fragment.getMemInitial(), - fragment.getMemMax()); - Preconditions.checkNotNull(allocator, "Unable to acuqire allocator"); - } catch (final OutOfMemoryException e) { - throw UserException.memoryError(e) - .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId()) - .build(logger); - } catch(final Throwable e) { - throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e); - } - - stats = new FragmentStats(allocator, fragment.getAssignment()); - bufferManager = new BufferManagerImpl(this.allocator); - constantValueHolderCache = Maps.newHashMap(); - } + OptionManager getOptions(); + + boolean isImpersonationEnabled(); /** - * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying - * the long list of test files. + * Generates code for a class given a {@link ClassGenerator}, + * and returns a single instance of the generated class. (Note + * that the name is a misnomer, it would be better called + * <tt>getImplementationInstance</tt>.) + * + * @param cg the class generator + * @return an instance of the generated class */ - public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection, - FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { - this(dbContext, fragment, null, connection, funcRegistry); - } - - @Override - public OptionManager getOptions() { - return fragmentOptions; - } - - public void setBuffers(final IncomingBuffers buffers) { - Preconditions.checkArgument(this.buffers == null, "Can only set buffers once."); - this.buffers = buffers; - } - - public void setExecutorState(final ExecutorState executorState) { - Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once."); - this.executorState = executorState; - } - - public void fail(final Throwable cause) { - executorState.fail(cause); - } + <T> T getImplementationClass(final ClassGenerator<T> cg) + throws ClassTransformationException, IOException; /** - * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation) - * will mean that the fragment should prematurely exit execution. Long running operations should check this every so - * often so that Drill is responsive to cancellation operations. + * Generates code for a class given a {@link CodeGenerator}, + * and returns a single instance of the generated class. (Note + * that the name is a misnomer, it would be better called + * <tt>getImplementationInstance</tt>.) * - * @return false if the action should terminate immediately, true if everything is okay. + * @param cg the code generator + * @return an instance of the generated class */ - @Override - public boolean shouldContinue() { - return executorState.shouldContinue(); - } - - @Override - public DrillbitContext getDrillbitContext() { - return context; - } + <T> T getImplementationClass(final CodeGenerator<T> cg) + throws ClassTransformationException, IOException; /** - * This method is only used to construt InfoSchemaReader, it is for the reader to get full schema, so here we - * are going to return a fully initialized schema tree. - * @return root schema's plus + * Generates code for a class given a {@link ClassGenerator}, and returns the + * specified number of instances of the generated class. (Note that the name + * is a misnomer, it would be better called + * <tt>getImplementationInstances</tt>.) + * + * @param cg the class generator + * @return list of instances of the generated class */ - public SchemaPlus getFullRootSchema() { - if (queryContext == null) { - fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " + - "This is a non-root fragment.")); - return null; - } - - final boolean isImpersonationEnabled = isImpersonationEnabled(); - // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for - // InfoSchema purpose we want to show tables the user has permissions to list or query. If impersonation is - // disabled view the schema as Drillbit process user and throw authorization errors to client. - SchemaConfig schemaConfig = SchemaConfig - .newBuilder( - isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(), - queryContext) - .setIgnoreAuthErrors(isImpersonationEnabled) - .build(); - - return queryContext.getFullRootSchema(schemaConfig); - } + <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) + throws ClassTransformationException, IOException; /** - * Get this node's identity. - * @return A DrillbitEndpoint object. + * Generates code for a class given a {@link CodeGenerator}, and returns the + * specified number of instances of the generated class. (Note that the name + * is a misnomer, it would be better called + * <tt>getImplementationInstances</tt>.) + * + * @param cg the code generator + * @return list of instances of the generated class */ - public DrillbitEndpoint getIdentity() { - return context.getEndpoint(); - } - - public FragmentStats getStats() { - return stats; - } - - @Override - public ContextInformation getContextInformation() { - return contextInformation; - } - - public DrillbitEndpoint getForemanEndpoint() { - return fragment.getForeman(); - } + <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) + throws ClassTransformationException, IOException; /** - * The FragmentHandle for this Fragment - * @return FragmentHandle + * Return the set of execution controls used to inject faults into running + * code for testing. + * + * @return the execution controls */ - public FragmentHandle getHandle() { - return fragment.getHandle(); - } - - public String getFragIdString() { - final FragmentHandle handle = getHandle(); - final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0"; - return frag; - } + ExecutionControls getExecutionControls(); /** - * Get this fragment's allocator. - * @return the allocator + * Returns the Drill configuration for this run. Note that the config is + * global and immutable. + * + * @return the Drill configuration */ - @Deprecated - public BufferAllocator getAllocator() { - if (allocator == null) { - logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL"); - } - return allocator; - } - - public BufferAllocator getNewChildAllocator(final String operatorName, - final int operatorId, - final long initialReservation, - final long maximumReservation) throws OutOfMemoryException { - return allocator.newChildAllocator( - "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName, - initialReservation, - maximumReservation - ); - } - - public boolean isOverMemoryLimit() { - return allocator.isOverLimit(); - } - - @Override - protected CodeCompiler getCompiler() { - return context.getCompiler(); - } + DrillConfig getConfig(); - public AccountingUserConnection getUserDataTunnel() { - Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel"); - return accountingUserConnection; - } + CodeCompiler getCompiler(); - public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) { - return context.getController().getTunnel(endpoint); - } + ExecutorService getScanDecodeExecutor(); - public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) { - AccountingDataTunnel tunnel = tunnels.get(endpoint); - if (tunnel == null) { - tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler); - tunnels.put(endpoint, tunnel); - } - return tunnel; - } + ExecutorService getScanExecutor(); - public IncomingBuffers getBuffers() { - return buffers; - } + ExecutorService getExecutor(); - @Override - public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats) - throws OutOfMemoryException { - OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats); - contexts.add(context); - return context; - } + ExecutorState getExecutorState(); - @Override - public OperatorContext newOperatorContext(PhysicalOperator popConfig) - throws OutOfMemoryException { - OperatorContextImpl context = new OperatorContextImpl(popConfig, this); - contexts.add(context); - return context; - } + BufferAllocator getNewChildAllocator(final String operatorName, + final int operatorId, + final long initialReservation, + final long maximumReservation); - @VisibleForTesting - @Deprecated - public Throwable getFailureCause() { - return executorState.getFailureCause(); - } + ExecProtos.FragmentHandle getHandle(); - @VisibleForTesting - @Deprecated - public boolean isFailed() { - return executorState.isFailed(); - } + BufferAllocator getAllocator(); - @Override - public DrillConfig getConfig() { - return context.getConfig(); - } + OperatorContext newOperatorContext(PhysicalOperator popConfig); - public void setFragmentLimit(final long limit) { - allocator.setLimit(limit); - } + OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats); - @Override - public ExecutionControls getExecutionControls() { - return executionControls; - } + SchemaPlus getFullRootSchema(); - @Override - public String getQueryUserName() { - return fragment.getCredentials().getUserName(); - } + String getQueryUserName(); - public boolean isImpersonationEnabled() { - // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is - // no config - if (getConfig() == null) { - return false; - } + String getFragIdString(); - return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED); - } + DrillBuf replace(DrillBuf old, int newSize); - public boolean isUserAuthenticationEnabled() { - // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is - // no config - if (getConfig() == null) { - return false; - } + DrillBuf getManagedBuffer(); - return getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED); - } + DrillBuf getManagedBuffer(int size); @Override - public void close() { - waitForSendComplete(); - - // close operator context - for (OperatorContextImpl opContext : contexts) { - suppressingClose(opContext); - } - - suppressingClose(bufferManager); - suppressingClose(buffers); - suppressingClose(allocator); - } + void close(); - private void suppressingClose(final AutoCloseable closeable) { - try { - if (closeable != null) { - closeable.close(); - } - } catch (final Exception e) { - fail(e); - } - } - - @Override - public PartitionExplorer getPartitionExplorer() { - throw new UnsupportedOperationException(String.format("The partition explorer interface can only be used " + - "in functions that can be evaluated at planning time. Make sure that the %s configuration " + - "option is set to true.", PlannerSettings.CONSTANT_FOLDING.getOptionName())); - } - - @Override - public ValueHolder getConstantValueHolder(String value, MinorType type, Function<DrillBuf, ValueHolder> holderInitializer) { - if (!constantValueHolderCache.containsKey(value)) { - constantValueHolderCache.put(value, Maps.<MinorType, ValueHolder>newHashMap()); - } - - Map<MinorType, ValueHolder> holdersByType = constantValueHolderCache.get(value); - ValueHolder valueHolder = holdersByType.get(type); - if (valueHolder == null) { - valueHolder = holderInitializer.apply(getManagedBuffer()); - holdersByType.put(type, valueHolder); - } - return valueHolder; - } - - public Executor getExecutor(){ - return context.getExecutor(); - } - - /** - * Wait for ack that all outgoing batches have been sent - */ - public void waitForSendComplete() { - sendingAccountor.waitForSendComplete(); - } - - public WorkEventBus getWorkEventbus() { - return context.getWorkBus(); - } - - public boolean isBuffersDone() { - Preconditions.checkState(this.buffers != null, "Incoming Buffers is not set in this fragment context"); - return buffers.isDone(); - } - - @Override - protected BufferManager getBufferManager() { - return bufferManager; - } - - public interface ExecutorState { + interface ExecutorState { /** - * Whether execution should continue. + * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation) + * will mean that the fragment should prematurely exit execution. Long running operations should check this every so + * often so that Drill is responsive to cancellation operations. * - * @return false if execution should stop. + * @return False if the action should terminate immediately, true if everything is okay. */ - public boolean shouldContinue(); + boolean shouldContinue(); /** * Inform the executor if a exception occurs and fragment should be failed. @@ -492,16 +181,14 @@ public class FragmentContext extends BaseFragmentContext implements AutoCloseabl * @param t * The exception that occurred. */ - public void fail(final Throwable t); + void fail(final Throwable t); @VisibleForTesting @Deprecated - public boolean isFailed(); + boolean isFailed(); @VisibleForTesting @Deprecated - public Throwable getFailureCause(); - + Throwable getFailureCause(); } - }