DRILL-6130: Fix NPE during physical plan submission for various storage plugins
1. Fixed ser / de issues for Hive, Kafka, Hbase plugins. 2. Added physical plan submission unit test for all storage plugins in contrib module. 3. Refactoring. closes #1108 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/58e4cec9 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/58e4cec9 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/58e4cec9 Branch: refs/heads/master Commit: 58e4cec9a913e381ef0e96b072c31e34085277b3 Parents: 58f3b10 Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> Authored: Thu Feb 1 17:44:43 2018 +0000 Committer: Vitalii Diravka <vitalii.dira...@gmail.com> Committed: Fri Feb 16 20:25:25 2018 +0000 ---------------------------------------------------------------------- .../drill/exec/store/hbase/HBaseGroupScan.java | 5 +- .../drill/exec/store/hbase/HBaseSubScan.java | 38 +++--- .../apache/drill/hbase/TestHBaseQueries.java | 6 + .../planner/sql/HivePartitionDescriptor.java | 14 +- ...onvertHiveParquetScanToDrillParquetScan.java | 14 +- .../store/hive/HiveDrillNativeParquetScan.java | 22 ++-- .../hive/HiveDrillNativeParquetSubScan.java | 4 +- .../apache/drill/exec/store/hive/HiveScan.java | 68 ++++++---- .../drill/exec/store/hive/HiveSubScan.java | 130 ++++++++++--------- .../apache/drill/exec/hive/TestHiveStorage.java | 7 +- .../store/jdbc/TestJdbcPluginWithMySQLIT.java | 5 + .../drill/exec/store/kafka/KafkaGroupScan.java | 40 +++--- .../drill/exec/store/kafka/KafkaSubScan.java | 56 ++++---- .../exec/store/kafka/KafkaQueriesTest.java | 21 +-- .../exec/store/kafka/MessageIteratorTest.java | 2 +- .../drill/exec/store/kafka/QueryConstants.java | 40 ------ .../drill/exec/store/kafka/TestKafkaSuit.java | 6 +- .../exec/store/kafka/TestQueryConstants.java | 40 ++++++ .../kafka/cluster/EmbeddedKafkaCluster.java | 4 +- .../drill/exec/store/kudu/KuduGroupScan.java | 39 +++--- .../drill/exec/store/kudu/KuduSubScan.java | 39 +++--- .../apache/drill/store/kudu/TestKuduPlugin.java | 8 +- .../exec/store/mongo/TestMongoQueries.java | 7 + .../store/openTSDB/TestOpenTSDBPlugin.java | 7 +- .../apache/drill/exec/proto/UserBitShared.java | 21 ++- .../exec/proto/beans/CoreOperatorType.java | 4 +- protocol/src/main/protobuf/UserBitShared.proto | 1 + 27 files changed, 353 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 69c2725..1178298 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -160,7 +160,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst statsCalculator = new TableStatsCalculator(conn, hbaseScanSpec, storagePlugin.getContext().getConfig(), storagePluginConfig); boolean foundStartRegion = false; - regionsToScan = new TreeMap<HRegionInfo, ServerName>(); + regionsToScan = new TreeMap<>(); for (HRegionLocation regionLocation : regionLocations) { HRegionInfo regionInfo = regionLocation.getRegionInfo(); if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) { @@ -338,8 +338,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst assert minorFragmentId < endpointFragmentMapping.size() : String.format( "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(), minorFragmentId); - return new HBaseSubScan(getUserName(), storagePlugin, storagePluginConfig, - endpointFragmentMapping.get(minorFragmentId), columns); + return new HBaseSubScan(getUserName(), storagePlugin, endpointFragmentMapping.get(minorFragmentId), columns); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java index 0527391..bd179fb 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -49,9 +48,6 @@ import com.google.common.base.Preconditions; public class HBaseSubScan extends AbstractBase implements SubScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class); - @JsonProperty - public final HBaseStoragePluginConfig storage; - @JsonIgnore private final HBaseStoragePlugin hbaseStoragePlugin; private final List<HBaseSubScanSpec> regionScanSpecList; private final List<SchemaPath> columns; @@ -59,34 +55,36 @@ public class HBaseSubScan extends AbstractBase implements SubScan { @JsonCreator public HBaseSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, - @JsonProperty("storage") StoragePluginConfig storage, + @JsonProperty("hbaseStoragePluginConfig") HBaseStoragePluginConfig hbaseStoragePluginConfig, @JsonProperty("regionScanSpecList") LinkedList<HBaseSubScanSpec> regionScanSpecList, @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException { - super(userName); - hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage); - this.regionScanSpecList = regionScanSpecList; - this.storage = (HBaseStoragePluginConfig) storage; - this.columns = columns; + this(userName, + (HBaseStoragePlugin) registry.getPlugin(hbaseStoragePluginConfig), + regionScanSpecList, + columns); } - public HBaseSubScan(String userName, HBaseStoragePlugin plugin, HBaseStoragePluginConfig config, - List<HBaseSubScanSpec> regionInfoList, List<SchemaPath> columns) { + public HBaseSubScan(String userName, + HBaseStoragePlugin hbaseStoragePlugin, + List<HBaseSubScanSpec> regionInfoList, + List<SchemaPath> columns) { super(userName); - hbaseStoragePlugin = plugin; - storage = config; + this.hbaseStoragePlugin = hbaseStoragePlugin; this.regionScanSpecList = regionInfoList; this.columns = columns; } - public List<HBaseSubScanSpec> getRegionScanSpecList() { - return regionScanSpecList; + @JsonProperty + public HBaseStoragePluginConfig getHbaseStoragePluginConfig() { + return hbaseStoragePlugin.getConfig(); } - @JsonIgnore - public HBaseStoragePluginConfig getStorageConfig() { - return storage; + @JsonProperty + public List<HBaseSubScanSpec> getRegionScanSpecList() { + return regionScanSpecList; } + @JsonProperty public List<SchemaPath> getColumns() { return columns; } @@ -109,7 +107,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan { @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.isEmpty()); - return new HBaseSubScan(getUserName(), hbaseStoragePlugin, storage, regionScanSpecList, columns); + return new HBaseSubScan(getUserName(), hbaseStoragePlugin, regionScanSpecList, columns); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java index ee839c5..0d53499 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java @@ -20,6 +20,7 @@ package org.apache.drill.hbase; import java.util.Arrays; import java.util.List; +import org.apache.drill.PlanTestBase; import org.apache.drill.categories.HbaseStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.rpc.user.QueryDataBatch; @@ -109,4 +110,9 @@ public class TestHBaseQueries extends BaseHBaseTest { runHBaseSQLVerifyCount("SELECT row_key\n" + " FROM hbase.TestTableNullStr t WHERE row_key='a1'", 1); } + + @Test + public void testPhysicalPlanSubmission() throws Exception { + PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hbase.TestTableNullStr"); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java index b22c14d..2d2bb6c 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -64,7 +64,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { this.scanRel = scanRel; this.managedBuffer = managedBuffer.reallocIfNeeded(256); this.defaultPartitionValue = defaultPartitionValue; - for (HiveTableWrapper.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).hiveReadEntry.table.partitionKeys) { + for (HiveTableWrapper.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry().table.partitionKeys) { partitionMap.put(wrapper.name, i); i++; } @@ -88,7 +88,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { @Override public String getBaseTableLocation() { - HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry; + HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry(); return origEntry.table.getTable().getSd().getLocation(); } @@ -97,7 +97,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { int record = 0; final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); - final Map<String, String> partitionNameTypeMap = hiveScan.hiveReadEntry.table.getPartitionNameTypeMap(); + final Map<String, String> partitionNameTypeMap = hiveScan.getHiveReadEntry().table.getPartitionNameTypeMap(); for(PartitionLocation partitionLocation: partitions) { for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){ final String hiveType = partitionNameTypeMap.get(fieldNameMap.get(partitionColumnIndex)); @@ -126,7 +126,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings) { HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); String partitionName = column.getAsNamePart().getName(); - Map<String, String> partitionNameTypeMap = hiveScan.hiveReadEntry.table.getPartitionNameTypeMap(); + Map<String, String> partitionNameTypeMap = hiveScan.getHiveReadEntry().table.getPartitionNameTypeMap(); String hiveType = partitionNameTypeMap.get(partitionName); PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(hiveType); @@ -143,7 +143,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { @Override protected void createPartitionSublists() { List<PartitionLocation> locations = new LinkedList<>(); - HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry; + HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry(); for (Partition partition: origEntry.getPartitions()) { locations.add(new HivePartitionLocation(partition.getValues(), partition.getSd().getLocation())); } @@ -165,7 +165,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { private GroupScan createNewGroupScan(List<PartitionLocation> newPartitionLocations) throws ExecutionSetupException { HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); - HiveReadEntry origReadEntry = hiveScan.hiveReadEntry; + HiveReadEntry origReadEntry = hiveScan.getHiveReadEntry(); List<HiveTableWrapper.HivePartitionWrapper> oldPartitions = origReadEntry.partitions; List<HiveTableWrapper.HivePartitionWrapper> newPartitions = Lists.newLinkedList(); http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java index b1b966a..a732245 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java @@ -97,16 +97,16 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); final HiveConf hiveConf = hiveScan.getHiveConf(); - final HiveTableWithColumnCache hiveTable = hiveScan.hiveReadEntry.getTable(); + final HiveTableWithColumnCache hiveTable = hiveScan.getHiveReadEntry().getTable(); final Class<? extends InputFormat<?,?>> tableInputFormat = - getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.hiveReadEntry, hiveTable.getSd(), + getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.getHiveReadEntry(), hiveTable.getSd(), hiveConf); if (tableInputFormat == null || !tableInputFormat.equals(MapredParquetInputFormat.class)) { return false; } - final List<HivePartitionWrapper> partitions = hiveScan.hiveReadEntry.getHivePartitionWrappers(); + final List<HivePartitionWrapper> partitions = hiveScan.getHiveReadEntry().getHivePartitionWrappers(); if (partitions == null) { return true; } @@ -116,7 +116,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim for (HivePartitionWrapper partition : partitions) { final StorageDescriptor partitionSD = partition.getPartition().getSd(); Class<? extends InputFormat<?, ?>> inputFormat = getInputFormatFromSD( - HiveUtilities.getPartitionMetadata(partition.getPartition(), hiveTable), hiveScan.hiveReadEntry, partitionSD, + HiveUtilities.getPartitionMetadata(partition.getPartition(), hiveTable), hiveScan.getHiveReadEntry(), partitionSD, hiveConf); if (inputFormat == null || !inputFormat.equals(tableInputFormat)) { return false; @@ -172,7 +172,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); final String partitionColumnLabel = settings.getFsPartitionColumnLabel(); - final Table hiveTable = hiveScan.hiveReadEntry.getTable(); + final Table hiveTable = hiveScan.getHiveReadEntry().getTable(); checkForUnsupportedDataTypes(hiveTable); final Map<String, String> partitionColMapping = @@ -245,8 +245,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim final HiveDrillNativeParquetScan nativeHiveScan = new HiveDrillNativeParquetScan( hiveScan.getUserName(), - hiveScan.hiveReadEntry, - hiveScan.storagePlugin, + hiveScan.getHiveReadEntry(), + hiveScan.getStoragePlugin(), nativeScanCols, null); http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java index ccec61a..202bd43 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -41,16 +41,16 @@ public class HiveDrillNativeParquetScan extends HiveScan { @JsonCreator public HiveDrillNativeParquetScan(@JsonProperty("userName") String userName, - @JsonProperty("hive-table") HiveReadEntry hiveReadEntry, - @JsonProperty("storage-plugin") String storagePluginName, + @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry, + @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig, @JsonProperty("columns") List<SchemaPath> columns, @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { - super(userName, hiveReadEntry, storagePluginName, columns, pluginRegistry); + super(userName, hiveReadEntry, hiveStoragePluginConfig, columns, pluginRegistry); } - public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, + public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin hiveStoragePlugin, List<SchemaPath> columns, HiveMetadataProvider metadataProvider) throws ExecutionSetupException { - super(userName, hiveReadEntry, storagePlugin, columns, metadataProvider); + super(userName, hiveReadEntry, hiveStoragePlugin, columns, metadataProvider); } public HiveDrillNativeParquetScan(final HiveScan hiveScan) { @@ -91,7 +91,7 @@ public class HiveDrillNativeParquetScan extends HiveScan { @Override public HiveScan clone(HiveReadEntry hiveReadEntry) throws ExecutionSetupException { - return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, storagePlugin, columns, metadataProvider); + return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, getStoragePlugin(), getColumns(), getMetadataProvider()); } @Override @@ -103,12 +103,12 @@ public class HiveDrillNativeParquetScan extends HiveScan { @Override public String toString() { - final List<HivePartitionWrapper> partitions = hiveReadEntry.getHivePartitionWrappers(); + final List<HivePartitionWrapper> partitions = getHiveReadEntry().getHivePartitionWrappers(); int numPartitions = partitions == null ? 0 : partitions.size(); - return "HiveDrillNativeParquetScan [table=" + hiveReadEntry.getHiveTableWrapper() - + ", columns=" + columns + return "HiveDrillNativeParquetScan [table=" + getHiveReadEntry().getHiveTableWrapper() + + ", columns=" + getColumns() + ", numPartitions=" + numPartitions + ", partitions= " + partitions - + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry) + "]"; + + ", inputDirectories=" + getMetadataProvider().getInputDirectories(getHiveReadEntry()) + "]"; } } http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java index 43cf98e..2129ed4 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java @@ -42,9 +42,9 @@ public class HiveDrillNativeParquetSubScan extends HiveSubScan { @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry, @JsonProperty("splitClasses") List<String> splitClasses, @JsonProperty("columns") List<SchemaPath> columns, - @JsonProperty("storagePluginName") String pluginName) + @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig) throws IOException, ExecutionSetupException, ReflectiveOperationException { - super(registry, userName, splits, hiveReadEntry, splitClasses, columns, pluginName); + super(registry, userName, splits, hiveReadEntry, splitClasses, columns, hiveStoragePluginConfig); } public HiveDrillNativeParquetSubScan(final HiveSubScan subScan) http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index cf8a671..11d47f3 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -61,41 +61,36 @@ public class HiveScan extends AbstractGroupScan { private static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN = 20; - @JsonProperty("hive-table") - public HiveReadEntry hiveReadEntry; + private final HiveStoragePlugin hiveStoragePlugin; + private final HiveReadEntry hiveReadEntry; + private final HiveMetadataProvider metadataProvider; - @JsonIgnore - public HiveStoragePlugin storagePlugin; - - @JsonProperty("columns") - public List<SchemaPath> columns; - - @JsonIgnore - protected final HiveMetadataProvider metadataProvider; - - @JsonIgnore private List<List<LogicalInputSplit>> mappings; + private List<LogicalInputSplit> inputSplits; - @JsonIgnore - protected List<LogicalInputSplit> inputSplits; + protected List<SchemaPath> columns; @JsonCreator public HiveScan(@JsonProperty("userName") final String userName, - @JsonProperty("hive-table") final HiveReadEntry hiveReadEntry, - @JsonProperty("storage-plugin") final String storagePluginName, + @JsonProperty("hiveReadEntry") final HiveReadEntry hiveReadEntry, + @JsonProperty("hiveStoragePluginConfig") final HiveStoragePluginConfig hiveStoragePluginConfig, @JsonProperty("columns") final List<SchemaPath> columns, @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { - this(userName, hiveReadEntry, (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName), columns, null); + this(userName, + hiveReadEntry, + (HiveStoragePlugin) pluginRegistry.getPlugin(hiveStoragePluginConfig), + columns, + null); } - public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, + public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin hiveStoragePlugin, final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider) throws ExecutionSetupException { super(userName); this.hiveReadEntry = hiveReadEntry; this.columns = columns; - this.storagePlugin = storagePlugin; + this.hiveStoragePlugin = hiveStoragePlugin; if (metadataProvider == null) { - this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, storagePlugin.getHiveConf()); + this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, hiveStoragePlugin.getHiveConf()); } else { this.metadataProvider = metadataProvider; } @@ -105,19 +100,39 @@ public class HiveScan extends AbstractGroupScan { super(that); this.columns = that.columns; this.hiveReadEntry = that.hiveReadEntry; - this.storagePlugin = that.storagePlugin; + this.hiveStoragePlugin = that.hiveStoragePlugin; this.metadataProvider = that.metadataProvider; } public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException { - return new HiveScan(getUserName(), hiveReadEntry, storagePlugin, columns, metadataProvider); + return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider); + } + + @JsonProperty + public HiveReadEntry getHiveReadEntry() { + return hiveReadEntry; } + @JsonProperty + public HiveStoragePluginConfig getHiveStoragePluginConfig() { + return hiveStoragePlugin.getConfig(); + } + + @JsonProperty public List<SchemaPath> getColumns() { return columns; } - protected List<LogicalInputSplit> getInputSplits() { + @JsonIgnore + public HiveStoragePlugin getStoragePlugin() { + return hiveStoragePlugin; + } + + protected HiveMetadataProvider getMetadataProvider() { + return metadataProvider; + } + + private List<LogicalInputSplit> getInputSplits() { if (inputSplits == null) { inputSplits = metadataProvider.getInputSplits(hiveReadEntry); } @@ -125,6 +140,7 @@ public class HiveScan extends AbstractGroupScan { return inputSplits; } + @Override public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) { mappings = new ArrayList<>(); @@ -160,7 +176,7 @@ public class HiveScan extends AbstractGroupScan { } final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.getTableWrapper(), parts); - return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, storagePlugin); + return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } @@ -174,7 +190,7 @@ public class HiveScan extends AbstractGroupScan { @Override public List<EndpointAffinity> getOperatorAffinity() { final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>(); - for (final DrillbitEndpoint endpoint : storagePlugin.getContext().getBits()) { + for (final DrillbitEndpoint endpoint : hiveStoragePlugin.getContext().getBits()) { endpointMap.put(endpoint.getAddress(), endpoint); logger.debug("endpoing address: {}", endpoint.getAddress()); } @@ -285,7 +301,7 @@ public class HiveScan extends AbstractGroupScan { @JsonIgnore public HiveConf getHiveConf() { - return storagePlugin.getHiveConf(); + return hiveStoragePlugin.getHiveConf(); } @JsonIgnore http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java index a1990a0..8ca8647 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import com.fasterxml.jackson.annotation.JacksonInject; +import com.google.common.collect.ImmutableSet; import org.apache.commons.codec.binary.Base64; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -40,26 +41,20 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.collect.Iterators; import com.google.common.io.ByteArrayDataInput; import com.google.common.io.ByteStreams; @JsonTypeName("hive-sub-scan") public class HiveSubScan extends AbstractBase implements SubScan { - protected HiveReadEntry hiveReadEntry; - @JsonIgnore - protected List<List<InputSplit>> inputSplits = new ArrayList<>(); - @JsonIgnore - protected HiveTableWithColumnCache table; - @JsonIgnore - protected List<HivePartition> partitions; - @JsonIgnore - protected HiveStoragePlugin storagePlugin; - - private List<List<String>> splits; - private List<String> splitClasses; - protected List<SchemaPath> columns; + private final HiveReadEntry hiveReadEntry; + private final List<List<InputSplit>> inputSplits = new ArrayList<>(); + private final HiveStoragePlugin hiveStoragePlugin; + private final List<List<String>> splits; + private final List<String> splitClasses; + private final HiveTableWithColumnCache table; + private final List<HivePartition> partitions; + private final List<SchemaPath> columns; @JsonCreator public HiveSubScan(@JacksonInject StoragePluginRegistry registry, @@ -68,13 +63,22 @@ public class HiveSubScan extends AbstractBase implements SubScan { @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry, @JsonProperty("splitClasses") List<String> splitClasses, @JsonProperty("columns") List<SchemaPath> columns, - @JsonProperty("storagePluginName") String pluginName) + @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig) throws IOException, ExecutionSetupException, ReflectiveOperationException { - this(userName, splits, hiveReadEntry, splitClasses, columns, (HiveStoragePlugin)registry.getPlugin(pluginName)); - } - - public HiveSubScan(final String userName, final List<List<String>> splits, final HiveReadEntry hiveReadEntry, - final List<String> splitClasses, final List<SchemaPath> columns, final HiveStoragePlugin plugin) + this(userName, + splits, + hiveReadEntry, + splitClasses, + columns, + (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig)); + } + + public HiveSubScan(final String userName, + final List<List<String>> splits, + final HiveReadEntry hiveReadEntry, + final List<String> splitClasses, + final List<SchemaPath> columns, + final HiveStoragePlugin hiveStoragePlugin) throws IOException, ReflectiveOperationException { super(userName); this.hiveReadEntry = hiveReadEntry; @@ -83,66 +87,61 @@ public class HiveSubScan extends AbstractBase implements SubScan { this.splits = splits; this.splitClasses = splitClasses; this.columns = columns; - this.storagePlugin = plugin; + this.hiveStoragePlugin = hiveStoragePlugin; for (int i = 0; i < splits.size(); i++) { inputSplits.add(deserializeInputSplit(splits.get(i), splitClasses.get(i))); } } - @JsonProperty("storagePluginName") - @SuppressWarnings("unused") - public String getStoragePluginName() { - return storagePlugin.getName(); - } - - @JsonIgnore - public HiveStoragePlugin getStoragePlugin() { - return storagePlugin; - } - + @JsonProperty public List<List<String>> getSplits() { return splits; } - public HiveTableWithColumnCache getTable() { - return table; - } - - public List<HivePartition> getPartitions() { - return partitions; + @JsonProperty + public HiveReadEntry getHiveReadEntry() { + return hiveReadEntry; } + @JsonProperty public List<String> getSplitClasses() { return splitClasses; } + @JsonProperty public List<SchemaPath> getColumns() { return columns; } + @JsonProperty + public HiveStoragePluginConfig getHiveStoragePluginConfig() { + return hiveStoragePlugin.getConfig(); + } + + @JsonIgnore + public HiveTableWithColumnCache getTable() { + return table; + } + + @JsonIgnore + public List<HivePartition> getPartitions() { + return partitions; + } + + @JsonIgnore public List<List<InputSplit>> getInputSplits() { return inputSplits; } - public HiveReadEntry getHiveReadEntry() { - return hiveReadEntry; + @JsonIgnore + public HiveStoragePlugin getStoragePlugin() { + return hiveStoragePlugin; } - public static List<InputSplit> deserializeInputSplit(List<String> base64, String className) throws IOException, ReflectiveOperationException{ - Constructor<?> constructor = Class.forName(className).getDeclaredConstructor(); - if (constructor == null) { - throw new ReflectiveOperationException("Class " + className + " does not implement a default constructor."); - } - constructor.setAccessible(true); - List<InputSplit> splits = new ArrayList<>(); - for (String str : base64) { - InputSplit split = (InputSplit) constructor.newInstance(); - ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(str)); - split.readFields(byteArrayDataInput); - splits.add(split); - } - return splits; + @JsonIgnore + public HiveConf getHiveConf() { + return hiveStoragePlugin.getHiveConf(); } @Override @@ -153,7 +152,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { try { - return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, storagePlugin); + return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } @@ -161,7 +160,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { @Override public Iterator<PhysicalOperator> iterator() { - return Iterators.emptyIterator(); + return ImmutableSet.<PhysicalOperator>of().iterator(); } @Override @@ -169,8 +168,21 @@ public class HiveSubScan extends AbstractBase implements SubScan { return CoreOperatorType.HIVE_SUB_SCAN_VALUE; } - @JsonIgnore - public HiveConf getHiveConf() { - return storagePlugin.getHiveConf(); + private static List<InputSplit> deserializeInputSplit(List<String> base64, String className) + throws IOException, ReflectiveOperationException{ + Constructor<?> constructor = Class.forName(className).getDeclaredConstructor(); + if (constructor == null) { + throw new ReflectiveOperationException("Class " + className + " does not implement a default constructor."); + } + constructor.setAccessible(true); + List<InputSplit> splits = new ArrayList<>(); + for (String str : base64) { + InputSplit split = (InputSplit) constructor.newInstance(); + ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(str)); + split.readFields(byteArrayDataInput); + splits.add(split); + } + return splits; } + } http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java index 64f3b5b..c2412ad 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.hive; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import org.apache.drill.PlanTestBase; import org.apache.drill.categories.HiveStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.common.exceptions.UserRemoteException; @@ -100,7 +101,6 @@ public class TestHiveStorage extends HiveTestBase { /** * Test to ensure Drill reads the all supported types correctly both normal fields (converted to Nullable types) and * partition fields (converted to Required types). - * @throws Exception */ @Test public void readAllSupportedHiveDataTypes() throws Exception { @@ -558,6 +558,11 @@ public class TestHiveStorage extends HiveTestBase { .go(); } + @Test + public void testPhysicalPlanSubmission() throws Exception { + PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv"); + } + private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> columnsList, Map<String, Integer> expectedResult) { for (UserProtos.ResultColumnMetadata columnMetadata : columnsList) { assertTrue("Column should be present in result set", expectedResult.containsKey(columnMetadata.getColumnName())); http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java ---------------------------------------------------------------------- diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java index 00db46b..7b8c21a 100644 --- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java +++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java @@ -127,4 +127,9 @@ public class TestJdbcPluginWithMySQLIT extends PlanTestBase { testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" }); } + @Test + public void testPhysicalPlanSubmission() throws Exception { + testPhysicalPlanExecutionBasedOnQuery("select * from mysql.`drill_mysql_test`.person"); + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java index e08c7d7..9cf575b 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java @@ -69,47 +69,49 @@ public class KafkaGroupScan extends AbstractGroupScan { private static final long MSG_SIZE = 1024; private final KafkaStoragePlugin kafkaStoragePlugin; - private final KafkaStoragePluginConfig kafkaStoragePluginConfig; - private List<SchemaPath> columns; private final KafkaScanSpec kafkaScanSpec; + private List<SchemaPath> columns; private List<PartitionScanWork> partitionWorkList; private ListMultimap<Integer, PartitionScanWork> assignments; private List<EndpointAffinity> affinities; @JsonCreator public KafkaGroupScan(@JsonProperty("userName") String userName, - @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig, - @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("scanSpec") KafkaScanSpec scanSpec, - @JacksonInject StoragePluginRegistry pluginRegistry) { - this(userName, kafkaStoragePluginConfig, columns, scanSpec, (KafkaStoragePlugin) pluginRegistry); + @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig, + @JsonProperty("columns") List<SchemaPath> columns, + @JsonProperty("kafkaScanSpec") KafkaScanSpec scanSpec, + @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { + this(userName, + (KafkaStoragePlugin) pluginRegistry.getPlugin(kafkaStoragePluginConfig), + columns, + scanSpec); } public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, KafkaScanSpec kafkaScanSpec, List<SchemaPath> columns) { super(StringUtils.EMPTY); this.kafkaStoragePlugin = kafkaStoragePlugin; - this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) kafkaStoragePlugin.getConfig(); this.columns = columns; this.kafkaScanSpec = kafkaScanSpec; init(); } - public KafkaGroupScan(String userName, KafkaStoragePluginConfig kafkaStoragePluginConfig, List<SchemaPath> columns, - KafkaScanSpec kafkaScanSpec, KafkaStoragePlugin pluginRegistry) { + public KafkaGroupScan(String userName, + KafkaStoragePlugin kafkaStoragePlugin, + List<SchemaPath> columns, + KafkaScanSpec kafkaScanSpec) { super(userName); - this.kafkaStoragePluginConfig = kafkaStoragePluginConfig; + this.kafkaStoragePlugin = kafkaStoragePlugin; this.columns = columns; this.kafkaScanSpec = kafkaScanSpec; - this.kafkaStoragePlugin = pluginRegistry; init(); } public KafkaGroupScan(KafkaGroupScan that) { super(that); - this.kafkaStoragePluginConfig = that.kafkaStoragePluginConfig; + this.kafkaStoragePlugin = that.kafkaStoragePlugin; this.columns = that.columns; this.kafkaScanSpec = that.kafkaScanSpec; - this.kafkaStoragePlugin = that.kafkaStoragePlugin; this.partitionWorkList = that.partitionWorkList; this.assignments = that.assignments; } @@ -242,7 +244,7 @@ public class KafkaGroupScan extends AbstractGroupScan { work.getBeginOffset(), work.getLatestOffset())); } - return new KafkaSubScan(getUserName(), kafkaStoragePlugin, kafkaStoragePluginConfig, columns, scanSpecList); + return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, scanSpecList); } @Override @@ -291,9 +293,9 @@ public class KafkaGroupScan extends AbstractGroupScan { return clone; } - @JsonProperty("kafkaStoragePluginConfig") - public KafkaStoragePluginConfig getStorageConfig() { - return this.kafkaStoragePluginConfig; + @JsonProperty + public KafkaStoragePluginConfig getKafkaStoragePluginConfig() { + return kafkaStoragePlugin.getConfig(); } @JsonProperty @@ -301,8 +303,8 @@ public class KafkaGroupScan extends AbstractGroupScan { return columns; } - @JsonProperty("kafkaScanSpec") - public KafkaScanSpec getScanSpec() { + @JsonProperty + public KafkaScanSpec getKafkaScanSpec() { return kafkaScanSpec; } http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java index fc110b5..468f766 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java @@ -41,34 +41,31 @@ import com.google.common.base.Preconditions; @JsonTypeName("kafka-partition-scan") public class KafkaSubScan extends AbstractBase implements SubScan { - @JsonProperty - private final KafkaStoragePluginConfig KafkaStoragePluginConfig; - - @JsonIgnore private final KafkaStoragePlugin kafkaStoragePlugin; private final List<SchemaPath> columns; - private final List<KafkaSubScanSpec> partitions; + private final List<KafkaSubScanSpec> partitionSubScanSpecList; @JsonCreator - public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, - @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig, - @JsonProperty("columns") List<SchemaPath> columns, - @JsonProperty("partitionSubScanSpecList") LinkedList<KafkaSubScanSpec> partitions) + public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, + @JsonProperty("userName") String userName, + @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig, + @JsonProperty("columns") List<SchemaPath> columns, + @JsonProperty("partitionSubScanSpecList") LinkedList<KafkaSubScanSpec> partitionSubScanSpecList) throws ExecutionSetupException { - super(userName); - this.KafkaStoragePluginConfig = kafkaStoragePluginConfig; - this.columns = columns; - this.partitions = partitions; - this.kafkaStoragePlugin = (KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig); + this(userName, + (KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig), + columns, + partitionSubScanSpecList); } - public KafkaSubScan(String userName, KafkaStoragePlugin plugin, KafkaStoragePluginConfig kafkStoragePluginConfig, - List<SchemaPath> columns, List<KafkaSubScanSpec> partitionSubScanSpecList) { + public KafkaSubScan(String userName, + KafkaStoragePlugin kafkaStoragePlugin, + List<SchemaPath> columns, + List<KafkaSubScanSpec> partitionSubScanSpecList) { super(userName); + this.kafkaStoragePlugin = kafkaStoragePlugin; this.columns = columns; - this.KafkaStoragePluginConfig = kafkStoragePluginConfig; - this.kafkaStoragePlugin = plugin; - this.partitions = partitionSubScanSpecList; + this.partitionSubScanSpecList = partitionSubScanSpecList; } @Override @@ -79,8 +76,7 @@ public class KafkaSubScan extends AbstractBase implements SubScan { @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); - return new KafkaSubScan(getUserName(), kafkaStoragePlugin, KafkaStoragePluginConfig, columns, - partitions); + return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, partitionSubScanSpecList); } @Override @@ -88,22 +84,24 @@ public class KafkaSubScan extends AbstractBase implements SubScan { return Collections.emptyIterator(); } - @JsonIgnore + @JsonProperty public KafkaStoragePluginConfig getKafkaStoragePluginConfig() { - return KafkaStoragePluginConfig; - } - - @JsonIgnore - public KafkaStoragePlugin getKafkaStoragePlugin() { - return kafkaStoragePlugin; + return kafkaStoragePlugin.getConfig(); } + @JsonProperty public List<SchemaPath> getColumns() { return columns; } + @JsonProperty public List<KafkaSubScanSpec> getPartitionSubScanSpecList() { - return partitions; + return partitionSubScanSpecList; + } + + @JsonIgnore + public KafkaStoragePlugin getKafkaStoragePlugin() { + return kafkaStoragePlugin; } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java index 1f5a1c0..ce9eb99 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.kafka; -import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -39,7 +38,7 @@ public class KafkaQueriesTest extends KafkaTestBase { @Test public void testSqlQueryOnInvalidTopic() throws Exception { - String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, QueryConstants.INVALID_TOPIC); + String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.INVALID_TOPIC); try { testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.<Map<String, Object>> emptyList()) .build().run(); @@ -51,7 +50,7 @@ public class KafkaQueriesTest extends KafkaTestBase { @Test public void testResultCount() throws Exception { - String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, QueryConstants.JSON_TOPIC); + String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC); runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG); } @@ -60,9 +59,9 @@ public class KafkaQueriesTest extends KafkaTestBase { // following kafka.tools.GetOffsetShell for earliest as -2 Map<TopicPartition, Long> startOffsetsMap = fetchOffsets(-2); - String queryString = String.format(QueryConstants.MIN_OFFSET_QUERY, QueryConstants.JSON_TOPIC); + String queryString = String.format(TestQueryConstants.MIN_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC); testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("minOffset") - .baselineValues(startOffsetsMap.get(new TopicPartition(QueryConstants.JSON_TOPIC, 0))).go(); + .baselineValues(startOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))).go(); } @Test @@ -70,9 +69,9 @@ public class KafkaQueriesTest extends KafkaTestBase { // following kafka.tools.GetOffsetShell for latest as -1 Map<TopicPartition, Long> endOffsetsMap = fetchOffsets(-1); - String queryString = String.format(QueryConstants.MAX_OFFSET_QUERY, QueryConstants.JSON_TOPIC); + String queryString = String.format(TestQueryConstants.MAX_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC); testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("maxOffset") - .baselineValues(endOffsetsMap.get(new TopicPartition(QueryConstants.JSON_TOPIC, 0))-1).go(); + .baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))-1).go(); } private Map<TopicPartition, Long> fetchOffsets(int flag) { @@ -80,7 +79,7 @@ public class KafkaQueriesTest extends KafkaTestBase { new ByteArrayDeserializer(), new ByteArrayDeserializer()); Map<TopicPartition, Long> offsetsMap = Maps.newHashMap(); - kafkaConsumer.subscribe(Arrays.asList(QueryConstants.JSON_TOPIC)); + kafkaConsumer.subscribe(Collections.singletonList(TestQueryConstants.JSON_TOPIC)); // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions // evaluates lazily, seeking to the // first/last offset in all partitions only when poll(long) or @@ -110,4 +109,10 @@ public class KafkaQueriesTest extends KafkaTestBase { return offsetsMap; } + @Test + public void testPhysicalPlanSubmission() throws Exception { + String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC); + testPhysicalPlanExecutionBasedOnQuery(query); + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java index 0b31562..4a15596 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java @@ -49,7 +49,7 @@ public class MessageIteratorTest extends KafkaTestBase { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4"); kafkaConsumer = new KafkaConsumer<>(consumerProps); - subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, TestKafkaSuit.NUM_JSON_MSG); + subScanSpec = new KafkaSubScanSpec(TestQueryConstants.JSON_TOPIC, 0, 0, TestKafkaSuit.NUM_JSON_MSG); } @After http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java deleted file mode 100644 index ff58f7e..0000000 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.kafka; - -public interface QueryConstants { - - // Kafka Server Prop Constants - public static final String BROKER_DELIM = ","; - public final String LOCAL_HOST = "127.0.0.1"; - - // ZK - public final static String ZK_TMP = "zk_tmp"; - public final static int TICK_TIME = 500; - public final static int MAX_CLIENT_CONNECTIONS = 100; - - public static final String JSON_TOPIC = "drill-json-topic"; - public static final String AVRO_TOPIC = "drill-avro-topic"; - public static final String INVALID_TOPIC = "invalid-topic"; - - // Queries - public static final String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`"; - public static final String MSG_SELECT_QUERY = "select * from kafka.`%s`"; - public static final String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`"; - public static final String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`"; -} http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java index 34677cb..ed01747 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java @@ -72,15 +72,15 @@ public class TestKafkaSuit { Properties topicProps = new Properties(); zkClient = new ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false); - AdminUtils.createTopic(zkUtils, QueryConstants.JSON_TOPIC, 1, 1, topicProps, RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, TestQueryConstants.JSON_TOPIC, 1, 1, topicProps, RackAwareMode.Disabled$.MODULE$); org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = AdminUtils - .fetchTopicMetadataFromZk(QueryConstants.JSON_TOPIC, zkUtils); + .fetchTopicMetadataFromZk(TestQueryConstants.JSON_TOPIC, zkUtils); logger.info("Topic Metadata: " + fetchTopicMetadataFromZk); KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class); - generator.populateJsonMsgIntoKafka(QueryConstants.JSON_TOPIC, NUM_JSON_MSG); + generator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC, NUM_JSON_MSG); } initCount.incrementAndGet(); runningSuite = true; http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java new file mode 100644 index 0000000..057af7e --- /dev/null +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +public interface TestQueryConstants { + + // Kafka Server Prop Constants + String BROKER_DELIM = ","; + String LOCAL_HOST = "127.0.0.1"; + + // ZK + String ZK_TMP = "zk_tmp"; + int TICK_TIME = 500; + int MAX_CLIENT_CONNECTIONS = 100; + + String JSON_TOPIC = "drill-json-topic"; + String AVRO_TOPIC = "drill-avro-topic"; + String INVALID_TOPIC = "invalid-topic"; + + // Queries + String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`"; + String MSG_SELECT_QUERY = "select * from kafka.`%s`"; + String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`"; + String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`"; +} http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java index 319c66c..663e0e4 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java @@ -26,7 +26,7 @@ import java.util.Properties; import org.apache.drill.exec.ZookeeperHelper; import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig; -import org.apache.drill.exec.store.kafka.QueryConstants; +import org.apache.drill.exec.store.kafka.TestQueryConstants; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.slf4j.Logger; @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; -public class EmbeddedKafkaCluster implements QueryConstants { +public class EmbeddedKafkaCluster implements TestQueryConstants { private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private List<KafkaServerStartable> brokers; private final ZookeeperHelper zkHelper; http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java index dfc3c44..7bddf18 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,6 @@ package org.apache.drill.exec.store.kudu; import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -45,7 +44,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import org.apache.drill.exec.store.schedule.AffinityCreator; import org.apache.drill.exec.store.schedule.AssignmentCreator; import org.apache.drill.exec.store.schedule.CompleteWork; @@ -59,10 +57,10 @@ public class KuduGroupScan extends AbstractGroupScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduGroupScan.class); private static final long DEFAULT_TABLET_SIZE = 1000; - private KuduStoragePluginConfig storagePluginConfig; + private KuduStoragePlugin kuduStoragePlugin; private List<SchemaPath> columns; private KuduScanSpec kuduScanSpec; - private KuduStoragePlugin storagePlugin; + private boolean filterPushedDown = false; private List<KuduWork> kuduWorkList = Lists.newArrayList(); private ListMultimap<Integer,KuduWork> assignments; @@ -71,31 +69,31 @@ public class KuduGroupScan extends AbstractGroupScan { @JsonCreator public KuduGroupScan(@JsonProperty("kuduScanSpec") KuduScanSpec kuduScanSpec, - @JsonProperty("storage") KuduStoragePluginConfig storagePluginConfig, + @JsonProperty("kuduStoragePluginConfig") KuduStoragePluginConfig kuduStoragePluginConfig, @JsonProperty("columns") List<SchemaPath> columns, @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { - this((KuduStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), kuduScanSpec, columns); + this((KuduStoragePlugin) pluginRegistry.getPlugin(kuduStoragePluginConfig), kuduScanSpec, columns); } - public KuduGroupScan(KuduStoragePlugin storagePlugin, KuduScanSpec scanSpec, - List<SchemaPath> columns) { + public KuduGroupScan(KuduStoragePlugin kuduStoragePlugin, + KuduScanSpec kuduScanSpec, + List<SchemaPath> columns) { super((String) null); - this.storagePlugin = storagePlugin; - this.storagePluginConfig = storagePlugin.getConfig(); - this.kuduScanSpec = scanSpec; + this.kuduStoragePlugin = kuduStoragePlugin; + this.kuduScanSpec = kuduScanSpec; this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns; init(); } private void init() { String tableName = kuduScanSpec.getTableName(); - Collection<DrillbitEndpoint> endpoints = storagePlugin.getContext().getBits(); + Collection<DrillbitEndpoint> endpoints = kuduStoragePlugin.getContext().getBits(); Map<String,DrillbitEndpoint> endpointMap = Maps.newHashMap(); for (DrillbitEndpoint endpoint : endpoints) { endpointMap.put(endpoint.getAddress(), endpoint); } try { - List<LocatedTablet> locations = storagePlugin.getClient().openTable(tableName).getTabletsLocations(10000); + List<LocatedTablet> locations = kuduStoragePlugin.getClient().openTable(tableName).getTabletsLocations(10000); for (LocatedTablet tablet : locations) { KuduWork work = new KuduWork(tablet.getPartition().getPartitionKeyStart(), tablet.getPartition().getPartitionKeyEnd()); for (Replica replica : tablet.getReplicas()) { @@ -153,10 +151,9 @@ public class KuduGroupScan extends AbstractGroupScan { */ private KuduGroupScan(KuduGroupScan that) { super(that); + this.kuduStoragePlugin = that.kuduStoragePlugin; this.columns = that.columns; this.kuduScanSpec = that.kuduScanSpec; - this.storagePlugin = that.storagePlugin; - this.storagePluginConfig = that.storagePluginConfig; this.filterPushedDown = that.filterPushedDown; this.kuduWorkList = that.kuduWorkList; this.assignments = that.assignments; @@ -204,7 +201,7 @@ public class KuduGroupScan extends AbstractGroupScan { scanSpecList.add(new KuduSubScanSpec(getTableName(), work.getPartitionKeyStart(), work.getPartitionKeyEnd())); } - return new KuduSubScan(storagePlugin, storagePluginConfig, scanSpecList, this.columns); + return new KuduSubScan(kuduStoragePlugin, scanSpecList, this.columns); } // KuduStoragePlugin plugin, KuduStoragePluginConfig config, @@ -224,7 +221,7 @@ public class KuduGroupScan extends AbstractGroupScan { @JsonIgnore public KuduStoragePlugin getStoragePlugin() { - return storagePlugin; + return kuduStoragePlugin; } @JsonIgnore @@ -244,9 +241,9 @@ public class KuduGroupScan extends AbstractGroupScan { + columns + "]"; } - @JsonProperty("storage") - public KuduStoragePluginConfig getStorageConfig() { - return this.storagePluginConfig; + @JsonProperty + public KuduStoragePluginConfig getKuduStoragePluginConfig() { + return kuduStoragePlugin.getConfig(); } @JsonProperty http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java index 9025db7..ca577e7 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,9 +21,9 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import com.google.common.collect.ImmutableSet; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -37,49 +37,40 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; // Class containing information for reading a single Kudu tablet -@JsonTypeName("kudu-tablet-scan") +@JsonTypeName("kudu-sub-scan") public class KuduSubScan extends AbstractBase implements SubScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSubScan.class); - @JsonProperty - public final KuduStoragePluginConfig storage; - - private final KuduStoragePlugin kuduStoragePlugin; private final List<KuduSubScanSpec> tabletScanSpecList; private final List<SchemaPath> columns; @JsonCreator public KuduSubScan(@JacksonInject StoragePluginRegistry registry, - @JsonProperty("storage") StoragePluginConfig storage, - @JsonProperty("tabletScanSpecList") LinkedList<KuduSubScanSpec> tabletScanSpecList, + @JsonProperty("kuduStoragePluginConfig") KuduStoragePluginConfig kuduStoragePluginConfig, + @JsonProperty("tabletScanSpecList") LinkedList<KuduSubScanSpec> tabletScanSpecList, @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException { super((String) null); - kuduStoragePlugin = (KuduStoragePlugin) registry.getPlugin(storage); + kuduStoragePlugin = (KuduStoragePlugin) registry.getPlugin(kuduStoragePluginConfig); this.tabletScanSpecList = tabletScanSpecList; - this.storage = (KuduStoragePluginConfig) storage; this.columns = columns; } - public KuduSubScan(KuduStoragePlugin plugin, KuduStoragePluginConfig config, - List<KuduSubScanSpec> tabletInfoList, List<SchemaPath> columns) { + public KuduSubScan(KuduStoragePlugin plugin, List<KuduSubScanSpec> tabletInfoList, List<SchemaPath> columns) { super((String) null); - kuduStoragePlugin = plugin; - storage = config; + this.kuduStoragePlugin = plugin; this.tabletScanSpecList = tabletInfoList; this.columns = columns; } - public List<KuduSubScanSpec> getTabletScanSpecList() { - return tabletScanSpecList; + public KuduStoragePluginConfig getKuduStoragePluginConfig() { + return kuduStoragePlugin.getConfig(); } - @JsonIgnore - public KuduStoragePluginConfig getStorageConfig() { - return storage; + public List<KuduSubScanSpec> getTabletScanSpecList() { + return tabletScanSpecList; } public List<SchemaPath> getColumns() { @@ -104,12 +95,12 @@ public class KuduSubScan extends AbstractBase implements SubScan { @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.isEmpty()); - return new KuduSubScan(kuduStoragePlugin, storage, tabletScanSpecList, columns); + return new KuduSubScan(kuduStoragePlugin, tabletScanSpecList, columns); } @Override public Iterator<PhysicalOperator> iterator() { - return Iterators.emptyIterator(); + return ImmutableSet.<PhysicalOperator>of().iterator(); } public static class KuduSubScanSpec { @@ -143,7 +134,7 @@ public class KuduSubScan extends AbstractBase implements SubScan { @Override public int getOperatorType() { - return CoreOperatorType.HBASE_SUB_SCAN_VALUE; + return CoreOperatorType.KUDU_SUB_SCAN_VALUE; } } http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java index b94033a..4e1c7fd 100644 --- a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java +++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,7 @@ */ package org.apache.drill.store.kudu; +import org.apache.drill.PlanTestBase; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.KuduStorageTest; import org.junit.Ignore; @@ -44,6 +45,11 @@ public class TestKuduPlugin extends BaseTestQuery { test("create table kudu.regions as select 1, * from sys.options limit 1"); test("select * from kudu.regions"); test("drop table kudu.regions"); + } + @Test + public void testPhysicalPlanSubmission() throws Exception { + PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from kudu.demo"); } + } http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java index d8043fd..8d0064f 100644 --- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java +++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java @@ -82,4 +82,11 @@ public class TestMongoQueries extends MongoTestBase { DONUTS_DB, DONUTS_COLLECTION); runMongoSQLVerifyCount(queryString, 5); } + + @Test + public void testPhysicalPlanSubmission() throws Exception { + String query = String.format(TEST_BOOLEAN_FILTER_QUERY_TEMPLATE1, + EMPLOYEE_DB, EMPINFO_COLLECTION); + testPhysicalPlanExecutionBasedOnQuery(query); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java index 2d6c506..27ca09c 100644 --- a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java +++ b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java @@ -174,10 +174,9 @@ public class TestOpenTSDBPlugin extends PlanTestBase { } @Test - public void testPhysicalPlanExecutionBasedOnQuery() throws Exception { - String query = "EXPLAIN PLAN for select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`"; - String plan = getPlanInString(query, JSON_FORMAT); - Assert.assertEquals(18, testPhysical(plan)); + public void testPhysicalPlanSubmission() throws Exception { + String query = "select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`"; + testPhysicalPlanExecutionBasedOnQuery(query); } @Test http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 9ef1f8d..6ae9deb 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -521,6 +521,10 @@ public final class UserBitShared { * <code>KAFKA_SUB_SCAN = 38;</code> */ KAFKA_SUB_SCAN(38, 38), + /** + * <code>KUDU_SUB_SCAN = 39;</code> + */ + KUDU_SUB_SCAN(39, 39), ; /** @@ -679,6 +683,10 @@ public final class UserBitShared { * <code>KAFKA_SUB_SCAN = 38;</code> */ public static final int KAFKA_SUB_SCAN_VALUE = 38; + /** + * <code>KUDU_SUB_SCAN = 39;</code> + */ + public static final int KUDU_SUB_SCAN_VALUE = 39; public final int getNumber() { return value; } @@ -724,6 +732,7 @@ public final class UserBitShared { case 36: return AVRO_SUB_SCAN; case 37: return PCAP_SUB_SCAN; case 38: return KAFKA_SUB_SCAN; + case 39: return KUDU_SUB_SCAN; default: return null; } } @@ -24104,7 +24113,7 @@ public final class UserBitShared { "agmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALL" + "OCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\t" + "CANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_" + - "REQUESTED\020\006*\204\006\n\020CoreOperatorType\022\021\n\rSING" + + "REQUESTED\020\006*\227\006\n\020CoreOperatorType\022\021\n\rSING" + "LE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FIL" + "TER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004" + "\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDE" + @@ -24123,11 +24132,11 @@ public final class UserBitShared { "X_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020 \022\022\n\016HB", "ASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOO" + "P_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_" + - "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&*g\n\nSaslStatus" + - "\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SA" + - "SL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SA" + - "SL_FAILED\020\004B.\n\033org.apache.drill.exec.pro" + - "toB\rUserBitSharedH\001" + "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_S" + + "CAN\020\'*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n" + + "\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014S" + + "ASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.ap" + + "ache.drill.exec.protoB\rUserBitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index 8ad38a5..71595f7 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -60,7 +60,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO NESTED_LOOP_JOIN(35), AVRO_SUB_SCAN(36), PCAP_SUB_SCAN(37), - KAFKA_SUB_SCAN(38); + KAFKA_SUB_SCAN(38), + KUDU_SUB_SCAN(39); public final int number; @@ -117,6 +118,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO case 36: return AVRO_SUB_SCAN; case 37: return PCAP_SUB_SCAN; case 38: return KAFKA_SUB_SCAN; + case 39: return KUDU_SUB_SCAN; default: return null; } } http://git-wip-us.apache.org/repos/asf/drill/blob/58e4cec9/protocol/src/main/protobuf/UserBitShared.proto ---------------------------------------------------------------------- diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index dc8bdb6..b13f059 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -326,6 +326,7 @@ enum CoreOperatorType { AVRO_SUB_SCAN = 36; PCAP_SUB_SCAN = 37; KAFKA_SUB_SCAN = 38; + KUDU_SUB_SCAN = 39; } /* Registry that contains list of jars, each jar contains its name and list of function signatures.