http://git-wip-us.apache.org/repos/asf/hive/blob/aed21d0b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 6167f48..3a179a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.optimizer.physical; import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; import java.io.Serializable; -import java.lang.annotation.Annotation; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -34,7 +33,6 @@ import java.util.Properties; import java.util.Set; import java.util.Stack; import java.util.regex.Pattern; -import org.apache.commons.lang.ArrayUtils; import org.apache.calcite.util.Pair; import org.apache.commons.lang.ArrayUtils; @@ -45,8 +43,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; -import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; @@ -66,11 +62,7 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOpe import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator; -import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; -import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; -import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping; -import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator; @@ -81,7 +73,6 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -100,36 +91,18 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.AggregationDesc; -import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; -import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; -import org.apache.hadoop.hive.ql.plan.LimitDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork; -import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.SelectDesc; -import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc; -import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorFilterDesc; -import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc; -import org.apache.hadoop.hive.ql.plan.VectorizationCondition; import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode; -import org.apache.hadoop.hive.ql.plan.VectorSparkHashTableSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorLimitDesc; -import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion; -import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -144,13 +117,10 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; -import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.OperatorVariation; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorDeserializeType; -import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; -import org.apache.hadoop.hive.ql.plan.VectorSelectDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.UDFAcos; import org.apache.hadoop.hive.ql.udf.UDFAsin; @@ -200,9 +170,6 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.NullStructSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -215,9 +182,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hive.common.util.AnnotationUtils; -import org.apache.hive.common.util.HiveStringUtils; -import org.apache.hive.common.util.ReflectionUtil; import com.google.common.base.Preconditions; @@ -270,39 +234,12 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean isSpark; - private boolean useVectorizedInputFileFormat; - private boolean useVectorDeserialize; - private boolean useRowDeserialize; - private boolean isReduceVectorizationEnabled; + boolean useVectorizedInputFileFormat; + boolean useVectorDeserialize; + boolean useRowDeserialize; boolean isSchemaEvolution; - private BaseWork currentBaseWork; - private Operator<? extends OperatorDesc> currentOperator; - - public void testSetCurrentBaseWork(BaseWork testBaseWork) { - currentBaseWork = testBaseWork; - } - - private void setNodeIssue(String issue) { - currentBaseWork.setNotVectorizedReason( - VectorizerReason.createNodeIssue(issue)); - } - - private void setOperatorIssue(String issue) { - currentBaseWork.setNotVectorizedReason( - VectorizerReason.createOperatorIssue(currentOperator, issue)); - } - - private void setExpressionIssue(String expressionTitle, String issue) { - currentBaseWork.setNotVectorizedReason( - VectorizerReason.createExpressionIssue(currentOperator, expressionTitle, issue)); - } - - private void clearNotVectorizedReason() { - currentBaseWork.setNotVectorizedReason(null); - } - public Vectorizer() { supportedGenericUDFs.add(GenericUDFOPPlus.class); @@ -432,10 +369,6 @@ public class Vectorizer implements PhysicalPlanResolver { int partitionColumnCount; boolean useVectorizedInputFileFormat; - boolean groupByVectorOutput; - boolean allNative; - boolean usesVectorUDFAdaptor; - String[] scratchTypeNameArray; Set<Operator<? extends OperatorDesc>> nonVectorizedOps; @@ -446,12 +379,6 @@ public class Vectorizer implements PhysicalPlanResolver { partitionColumnCount = 0; } - public void assume() { - groupByVectorOutput = true; - allNative = true; - usesVectorUDFAdaptor = false; - } - public void setAllColumnNames(List<String> allColumnNames) { this.allColumnNames = allColumnNames; } @@ -467,19 +394,9 @@ public class Vectorizer implements PhysicalPlanResolver { public void setScratchTypeNameArray(String[] scratchTypeNameArray) { this.scratchTypeNameArray = scratchTypeNameArray; } - public void setGroupByVectorOutput(boolean groupByVectorOutput) { - this.groupByVectorOutput = groupByVectorOutput; - } - public void setAllNative(boolean allNative) { - this.allNative = allNative; - } - public void setUsesVectorUDFAdaptor(boolean usesVectorUDFAdaptor) { - this.usesVectorUDFAdaptor = usesVectorUDFAdaptor; - } public void setUseVectorizedInputFileFormat(boolean useVectorizedInputFileFormat) { this.useVectorizedInputFileFormat = useVectorizedInputFileFormat; } - public void setNonVectorizedOps(Set<Operator<? extends OperatorDesc>> nonVectorizedOps) { this.nonVectorizedOps = nonVectorizedOps; } @@ -511,14 +428,7 @@ public class Vectorizer implements PhysicalPlanResolver { scratchTypeNameArray); baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx); - if (baseWork instanceof MapWork) { - MapWork mapWork = (MapWork) baseWork; - mapWork.setUseVectorizedInputFileFormat(useVectorizedInputFileFormat); - } - - baseWork.setAllNative(allNative); - baseWork.setGroupByVectorOutput(groupByVectorOutput); - baseWork.setUsesVectorUDFAdaptor(usesVectorUDFAdaptor); + baseWork.setUseVectorizedInputFileFormat(useVectorizedInputFileFormat); } } @@ -535,29 +445,17 @@ public class Vectorizer implements PhysicalPlanResolver { throws SemanticException { Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd; if (currTask instanceof MapRedTask) { - MapredWork mapredWork = ((MapRedTask) currTask).getWork(); - convertMapWork(mapredWork.getMapWork(), false); - ReduceWork reduceWork = mapredWork.getReduceWork(); - if (reduceWork != null) { - // Always set the EXPLAIN conditions. - setReduceWorkExplainConditions(reduceWork); - - // We do not vectorize MR Reduce. - } + convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false); } else if (currTask instanceof TezTask) { TezWork work = ((TezTask) currTask).getWork(); - for (BaseWork baseWork: work.getAllWork()) { - if (baseWork instanceof MapWork) { - convertMapWork((MapWork) baseWork, true); - } else if (baseWork instanceof ReduceWork) { - ReduceWork reduceWork = (ReduceWork) baseWork; - - // Always set the EXPLAIN conditions. - setReduceWorkExplainConditions(reduceWork); - - // We are only vectorizing Reduce under Tez/Spark. - if (isReduceVectorizationEnabled) { - convertReduceWork(reduceWork); + for (BaseWork w: work.getAllWork()) { + if (w instanceof MapWork) { + convertMapWork((MapWork) w, true); + } else if (w instanceof ReduceWork) { + // We are only vectorizing Reduce under Tez. + if (HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) { + convertReduceWork((ReduceWork) w, true); } } } @@ -565,43 +463,22 @@ public class Vectorizer implements PhysicalPlanResolver { SparkWork sparkWork = (SparkWork) currTask.getWork(); for (BaseWork baseWork : sparkWork.getAllWork()) { if (baseWork instanceof MapWork) { - convertMapWork((MapWork) baseWork, true); - } else if (baseWork instanceof ReduceWork) { - ReduceWork reduceWork = (ReduceWork) baseWork; - - // Always set the EXPLAIN conditions. - setReduceWorkExplainConditions(reduceWork); - - if (isReduceVectorizationEnabled) { - convertReduceWork(reduceWork); - } + convertMapWork((MapWork) baseWork, false); + } else if (baseWork instanceof ReduceWork + && HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) { + convertReduceWork((ReduceWork) baseWork, false); } } } - return null; } - private void convertMapWork(MapWork mapWork, boolean isTezOrSpark) throws SemanticException { - - mapWork.setVectorizationExamined(true); - - // Global used when setting errors, etc. - currentBaseWork = mapWork; - + private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException { VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); - vectorTaskColumnInfo.assume(); - - boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark); + boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez); if (ret) { - vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark); - } else if (currentBaseWork.getVectorizationEnabled()) { - VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason(); - if (notVectorizedReason == null) { - LOG.info("Cannot vectorize: unknown"); - } else { - LOG.info("Cannot vectorize: " + notVectorizedReason.toString()); - } + vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez); } } @@ -622,7 +499,6 @@ public class Vectorizer implements PhysicalPlanResolver { LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); if ((aliasToWork == null) || (aliasToWork.size() == 0)) { - setNodeIssue("Vectorized map work requires work"); return null; } int tableScanCount = 0; @@ -631,7 +507,7 @@ public class Vectorizer implements PhysicalPlanResolver { for (Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) { Operator<?> op = entry.getValue(); if (op == null) { - setNodeIssue("Vectorized map work requires a valid alias"); + LOG.warn("Map work has invalid aliases to work with. Fail validation!"); return null; } if (op instanceof TableScanOperator) { @@ -641,7 +517,7 @@ public class Vectorizer implements PhysicalPlanResolver { } } if (tableScanCount > 1) { - setNodeIssue("Vectorized map work only works with 1 TableScanOperator"); + LOG.warn("Map work has more than 1 TableScanOperator. Fail validation!"); return null; } return new ImmutablePair(alias, tableScanOperator); @@ -682,6 +558,22 @@ public class Vectorizer implements PhysicalPlanResolver { } } + private String getHiveOptionsString() { + StringBuilder sb = new StringBuilder(); + sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); + sb.append("="); + sb.append(useVectorizedInputFileFormat); + sb.append(", "); + sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname); + sb.append("="); + sb.append(useVectorDeserialize); + sb.append(", and "); + sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname); + sb.append("="); + sb.append(useRowDeserialize); + return sb.toString(); + } + /* * There are 3 modes of reading for vectorization: * @@ -696,58 +588,44 @@ public class Vectorizer implements PhysicalPlanResolver { * the row object into the VectorizedRowBatch with VectorAssignRow. * This picks up Input File Format not supported by the other two. */ - private boolean verifyAndSetVectorPartDesc(PartitionDesc pd, boolean isAcidTable, - HashSet<String> inputFileFormatClassNameSet, HashSet<String> enabledConditionsMetSet, - ArrayList<String> enabledConditionsNotMetList) { + private boolean verifyAndSetVectorPartDesc(PartitionDesc pd, boolean isAcidTable) { String inputFileFormatClassName = pd.getInputFileFormatClassName(); - // Always collect input file formats. - inputFileFormatClassNameSet.add(inputFileFormatClassName); - - boolean isInputFileFormatVectorized = Utilities.isInputFileFormatVectorized(pd); - - if (isAcidTable) { - - // Today, ACID tables are only ORC and that format is vectorizable. Verify these - // assumptions. - Preconditions.checkState(isInputFileFormatVectorized); - Preconditions.checkState(inputFileFormatClassName.equals(OrcInputFormat.class.getName())); - - if (!useVectorizedInputFileFormat) { - enabledConditionsNotMetList.add( - "Vectorizing ACID tables requires " + HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); - return false; - } - - pd.setVectorPartitionDesc( - VectorPartitionDesc.createVectorizedInputFileFormat( - inputFileFormatClassName, Utilities.isInputFileFormatSelfDescribing(pd))); - - enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); - return true; - } - // Look for Pass-Thru case where InputFileFormat has VectorizedInputFormatInterface // and reads VectorizedRowBatch as a "row". - if (useVectorizedInputFileFormat) { + if (isAcidTable || useVectorizedInputFileFormat) { - if (isInputFileFormatVectorized) { + if (Utilities.isInputFileFormatVectorized(pd)) { + + if (!useVectorizedInputFileFormat) { + LOG.info("ACID tables con only be vectorized for the input file format -- " + + "i.e. when Hive Configuration option " + + HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname + + "=true"); + return false; + } pd.setVectorPartitionDesc( VectorPartitionDesc.createVectorizedInputFileFormat( inputFileFormatClassName, Utilities.isInputFileFormatSelfDescribing(pd))); - enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); return true; } - // Fall through and look for other options... + + // Today, ACID tables are only ORC and that format is vectorizable. Verify this + // assumption. + Preconditions.checkState(!isAcidTable); } - if (!isSchemaEvolution) { - enabledConditionsNotMetList.add( - "Vectorizing tables without Schema Evolution requires " + HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); + if (!(isSchemaEvolution || isAcidTable) && + (useVectorDeserialize || useRowDeserialize)) { + LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" + + " when both " + HiveConf.ConfVars.HIVE_SCHEMA_EVOLUTION.varname + "=false and " + + " ACID table is " + isAcidTable + " and " + + " given the Hive Configuration options " + getHiveOptionsString()); + return false; } String deserializerClassName = pd.getDeserializerClassName(); @@ -757,12 +635,6 @@ public class Vectorizer implements PhysicalPlanResolver { // // Do the "vectorized" row-by-row deserialization into a VectorizedRowBatch in the // VectorMapOperator. - boolean isTextFormat = inputFileFormatClassName.equals(TextInputFormat.class.getName()) && - deserializerClassName.equals(LazySimpleSerDe.class.getName()); - boolean isSequenceFormat = - inputFileFormatClassName.equals(SequenceFileInputFormat.class.getName()) && - deserializerClassName.equals(LazyBinarySerDe.class.getName()); - boolean isVectorDeserializeEligable = isTextFormat || isSequenceFormat; if (useVectorDeserialize) { @@ -776,7 +648,8 @@ public class Vectorizer implements PhysicalPlanResolver { // org.apache.hadoop.mapred.SequenceFileInputFormat // org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe - if (isTextFormat) { + if (inputFileFormatClassName.equals(TextInputFormat.class.getName()) && + deserializerClassName.equals(LazySimpleSerDe.class.getName())) { Properties properties = pd.getTableDesc().getProperties(); String lastColumnTakesRestString = @@ -786,11 +659,10 @@ public class Vectorizer implements PhysicalPlanResolver { lastColumnTakesRestString.equalsIgnoreCase("true")); if (lastColumnTakesRest) { - // If row mode will not catch this input file format, then not enabled. + // If row mode will not catch this, then inform. if (useRowDeserialize) { - enabledConditionsNotMetList.add( - inputFileFormatClassName + " " + - serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST + " must be disabled "); + LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" + + " when " + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST + "is true"); return false; } } else { @@ -798,19 +670,17 @@ public class Vectorizer implements PhysicalPlanResolver { VectorPartitionDesc.createVectorDeserialize( inputFileFormatClassName, VectorDeserializeType.LAZY_SIMPLE)); - enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname); return true; } - } else if (isSequenceFormat) { + } else if (inputFileFormatClassName.equals(SequenceFileInputFormat.class.getName()) && + deserializerClassName.equals(LazyBinarySerDe.class.getName())) { pd.setVectorPartitionDesc( VectorPartitionDesc.createVectorDeserialize( inputFileFormatClassName, VectorDeserializeType.LAZY_BINARY)); - enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname); return true; } - // Fall through and look for other options... } // Otherwise, if enabled, deserialize rows using regular Serde and add the object @@ -824,29 +694,17 @@ public class Vectorizer implements PhysicalPlanResolver { Utilities.isInputFileFormatSelfDescribing(pd), deserializerClassName)); - enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname); return true; } - if (isInputFileFormatVectorized) { - Preconditions.checkState(!useVectorizedInputFileFormat); - enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); - } else { - // Only offer these when the input file format is not the fast vectorized formats. - if (isVectorDeserializeEligable) { - Preconditions.checkState(!useVectorDeserialize); - enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname); - } else { - // Since row mode takes everyone. - enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname); - } - } - + LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" + + " given the Hive Configuration options " + getHiveOptionsString()); + return false; } - private ImmutablePair<Boolean, Boolean> validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias, + private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias, TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException { @@ -874,39 +732,27 @@ public class Vectorizer implements PhysicalPlanResolver { LinkedHashMap<Path, ArrayList<String>> pathToAliases = mapWork.getPathToAliases(); LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo(); - - // Remember the input file formats we validated and why. - HashSet<String> inputFileFormatClassNameSet = new HashSet<String>(); - HashSet<String> enabledConditionsMetSet = new HashSet<String>(); - ArrayList<String> enabledConditionsNotMetList = new ArrayList<String>(); - for (Entry<Path, ArrayList<String>> entry: pathToAliases.entrySet()) { Path path = entry.getKey(); List<String> aliases = entry.getValue(); boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1); if (!isPresent) { - setOperatorIssue("Alias " + alias + " not present in aliases " + aliases); - return new ImmutablePair<Boolean,Boolean>(false, false); + LOG.info("Alias " + alias + " not present in aliases " + aliases); + return false; } PartitionDesc partDesc = pathToPartitionInfo.get(path); if (partDesc.getVectorPartitionDesc() != null) { // We seen this already. continue; } - if (!verifyAndSetVectorPartDesc(partDesc, isAcidTable, inputFileFormatClassNameSet, - enabledConditionsMetSet, enabledConditionsNotMetList)) { - - // Always set these so EXPLAIN can see. - mapWork.setVectorizationInputFileFormatClassNameSet(inputFileFormatClassNameSet); - mapWork.setVectorizationEnabledConditionsMet(new ArrayList(enabledConditionsMetSet)); - mapWork.setVectorizationEnabledConditionsNotMet(enabledConditionsNotMetList); - - // We consider this an enable issue, not a not vectorized issue. - LOG.info("Cannot enable vectorization because input file format(s) " + inputFileFormatClassNameSet + - " do not met conditions " + VectorizationCondition.addBooleans(enabledConditionsNotMetList, false)); - return new ImmutablePair<Boolean,Boolean>(false, true); + if (!verifyAndSetVectorPartDesc(partDesc, isAcidTable)) { + return false; } VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc(); + if (LOG.isInfoEnabled()) { + LOG.info("Vectorizer path: " + path + ", " + vectorPartDesc.toString() + + ", aliases " + aliases); + } if (isFirst) { @@ -950,13 +796,13 @@ public class Vectorizer implements PhysicalPlanResolver { * implicitly defaulted to null. */ if (nextDataColumnList.size() > tableDataColumnList.size()) { - setOperatorIssue( + LOG.info( String.format( "Could not vectorize partition %s " + "(deserializer " + deserializer.getClass().getName() + ")" + "The partition column names %d is greater than the number of table columns %d", path, nextDataColumnList.size(), tableDataColumnList.size())); - return new ImmutablePair<Boolean,Boolean>(false, false); + return false; } if (!(deserializer instanceof NullStructSerDe)) { @@ -965,13 +811,13 @@ public class Vectorizer implements PhysicalPlanResolver { String nextColumnName = nextDataColumnList.get(i); String tableColumnName = tableDataColumnList.get(i); if (!nextColumnName.equals(tableColumnName)) { - setOperatorIssue( + LOG.info( String.format( "Could not vectorize partition %s " + "(deserializer " + deserializer.getClass().getName() + ")" + "The partition column name %s is does not match table column name %s", path, nextColumnName, tableColumnName)); - return new ImmutablePair<Boolean,Boolean>(false, false); + return false; } } } @@ -1006,50 +852,29 @@ public class Vectorizer implements PhysicalPlanResolver { // Helps to keep this for debugging. vectorTaskColumnInfo.setTableScanOperator(tableScanOperator); - // Always set these so EXPLAIN can see. - mapWork.setVectorizationInputFileFormatClassNameSet(inputFileFormatClassNameSet); - mapWork.setVectorizationEnabledConditionsMet(new ArrayList(enabledConditionsMetSet)); - mapWork.setVectorizationEnabledConditionsNotMet(enabledConditionsNotMetList); - - return new ImmutablePair<Boolean,Boolean>(true, false); + return true; } - private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTezOrSpark) + private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException { LOG.info("Validating MapWork..."); - ImmutablePair<String,TableScanOperator> onlyOneTableScanPair = verifyOnlyOneTableScanOperator(mapWork); - if (onlyOneTableScanPair == null) { - VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason(); - Preconditions.checkState(notVectorizedReason != null); - mapWork.setVectorizationEnabledConditionsNotMet(Arrays.asList(new String[] {notVectorizedReason.toString()})); + ImmutablePair<String,TableScanOperator> pair = verifyOnlyOneTableScanOperator(mapWork); + if (pair == null) { return false; } - String alias = onlyOneTableScanPair.left; - TableScanOperator tableScanOperator = onlyOneTableScanPair.right; + String alias = pair.left; + TableScanOperator tableScanOperator = pair.right; // This call fills in the column names, types, and partition column count in // vectorTaskColumnInfo. - currentOperator = tableScanOperator; - ImmutablePair<Boolean, Boolean> validateInputFormatAndSchemaEvolutionPair = - validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo); - if (!validateInputFormatAndSchemaEvolutionPair.left) { - // Have we already set the enabled conditions not met? - if (!validateInputFormatAndSchemaEvolutionPair.right) { - VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason(); - Preconditions.checkState(notVectorizedReason != null); - mapWork.setVectorizationEnabledConditionsNotMet(Arrays.asList(new String[] {notVectorizedReason.toString()})); - } + if (!validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo)) { return false; } - // Now we are enabled and any issues found from here on out are considered - // not vectorized issues. - mapWork.setVectorizationEnabled(true); - Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); - MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTezOrSpark); + MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez); addMapWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -1071,13 +896,13 @@ public class Vectorizer implements PhysicalPlanResolver { } private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, - boolean isTezOrSpark) throws SemanticException { + boolean isTez) throws SemanticException { LOG.info("Vectorizing MapWork..."); mapWork.setVectorMode(true); Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); MapWorkVectorizationNodeProcessor vnp = - new MapWorkVectorizationNodeProcessor(mapWork, isTezOrSpark, vectorTaskColumnInfo); + new MapWorkVectorizationNodeProcessor(mapWork, isTez, vectorTaskColumnInfo); addMapWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new PreOrderOnceWalker(disp); @@ -1098,34 +923,11 @@ public class Vectorizer implements PhysicalPlanResolver { return; } - private void setReduceWorkExplainConditions(ReduceWork reduceWork) { - - reduceWork.setVectorizationExamined(true); - - reduceWork.setReduceVectorizationEnabled(isReduceVectorizationEnabled); - reduceWork.setVectorReduceEngine( - HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)); - } - - private void convertReduceWork(ReduceWork reduceWork) throws SemanticException { - - // Global used when setting errors, etc. - currentBaseWork = reduceWork; - currentBaseWork.setVectorizationEnabled(true); - + private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException { VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); - vectorTaskColumnInfo.assume(); - - boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo); + boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez); if (ret) { - vectorizeReduceWork(reduceWork, vectorTaskColumnInfo); - } else if (currentBaseWork.getVectorizationEnabled()) { - VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason(); - if (notVectorizedReason == null) { - LOG.info("Cannot vectorize: unknown"); - } else { - LOG.info("Cannot vectorize: " + notVectorizedReason.toString()); - } + vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez); } } @@ -1139,14 +941,13 @@ public class Vectorizer implements PhysicalPlanResolver { // Check key ObjectInspector. ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector(); if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) { - setNodeIssue("Key object inspector missing or not StructObjectInspector"); return false; } StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector; List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs(); + // Tez doesn't use tagging... if (reduceWork.getNeedsTagging()) { - setNodeIssue("Tez doesn't use tagging"); return false; } @@ -1154,7 +955,6 @@ public class Vectorizer implements PhysicalPlanResolver { ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector(); if (valueObjectInspector == null || !(valueObjectInspector instanceof StructObjectInspector)) { - setNodeIssue("Value object inspector missing or not StructObjectInspector"); return false; } StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector; @@ -1184,7 +984,7 @@ public class Vectorizer implements PhysicalPlanResolver { } private boolean validateReduceWork(ReduceWork reduceWork, - VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException { + VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException { LOG.info("Validating ReduceWork..."); @@ -1215,7 +1015,7 @@ public class Vectorizer implements PhysicalPlanResolver { } private void vectorizeReduceWork(ReduceWork reduceWork, - VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException { + VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException { LOG.info("Vectorizing ReduceWork..."); reduceWork.setVectorMode(true); @@ -1225,7 +1025,7 @@ public class Vectorizer implements PhysicalPlanResolver { // VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker. Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); ReduceWorkVectorizationNodeProcessor vnp = - new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo); + new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo, isTez); addReduceWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new PreOrderWalker(disp); @@ -1253,7 +1053,7 @@ public class Vectorizer implements PhysicalPlanResolver { class MapWorkValidationNodeProcessor implements NodeProcessor { private final MapWork mapWork; - private final boolean isTezOrSpark; + private final boolean isTez; // Children of Vectorized GROUPBY that outputs rows instead of vectorized row batchs. protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps = @@ -1263,9 +1063,9 @@ public class Vectorizer implements PhysicalPlanResolver { return nonVectorizedOps; } - public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTezOrSpark) { + public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTez) { this.mapWork = mapWork; - this.isTezOrSpark = isTezOrSpark; + this.isTez = isTez; } @Override @@ -1277,13 +1077,13 @@ public class Vectorizer implements PhysicalPlanResolver { return new Boolean(true); } boolean ret; - currentOperator = op; try { - ret = validateMapWorkOperator(op, mapWork, isTezOrSpark); + ret = validateMapWorkOperator(op, mapWork, isTez); } catch (Exception e) { throw new SemanticException(e); } if (!ret) { + LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); } // When Vectorized GROUPBY outputs rows instead of vectorized row batches, we don't @@ -1319,9 +1119,9 @@ public class Vectorizer implements PhysicalPlanResolver { if (nonVectorizedOps.contains(op)) { return new Boolean(true); } - currentOperator = op; boolean ret = validateReduceWorkOperator(op); if (!ret) { + LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); } // When Vectorized GROUPBY outputs rows instead of vectorized row batches, we don't @@ -1342,12 +1142,9 @@ public class Vectorizer implements PhysicalPlanResolver { // The vectorization context for the Map or Reduce task. protected VectorizationContext taskVectorizationContext; - protected final VectorTaskColumnInfo vectorTaskColumnInfo; protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps; - VectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo, - Set<Operator<? extends OperatorDesc>> nonVectorizedOps) { - this.vectorTaskColumnInfo = vectorTaskColumnInfo; + VectorizationNodeProcessor(Set<Operator<? extends OperatorDesc>> nonVectorizedOps) { this.nonVectorizedOps = nonVectorizedOps; } @@ -1395,11 +1192,11 @@ public class Vectorizer implements PhysicalPlanResolver { } public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, - VectorizationContext vContext, boolean isTezOrSpark) throws SemanticException { + VectorizationContext vContext, boolean isTez) throws SemanticException { Operator<? extends OperatorDesc> vectorOp = op; try { if (!opsDone.contains(op)) { - vectorOp = vectorizeOperator(op, vContext, isTezOrSpark, vectorTaskColumnInfo); + vectorOp = vectorizeOperator(op, vContext, isTez); opsDone.add(op); if (vectorOp != op) { opToVectorOpMap.put(op, vectorOp); @@ -1423,14 +1220,14 @@ public class Vectorizer implements PhysicalPlanResolver { private final MapWork mWork; private final VectorTaskColumnInfo vectorTaskColumnInfo; - private final boolean isTezOrSpark; + private final boolean isTez; - public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTezOrSpark, + public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez, VectorTaskColumnInfo vectorTaskColumnInfo) { - super(vectorTaskColumnInfo, vectorTaskColumnInfo.getNonVectorizedOps()); + super(vectorTaskColumnInfo.getNonVectorizedOps()); this.mWork = mWork; this.vectorTaskColumnInfo = vectorTaskColumnInfo; - this.isTezOrSpark = isTezOrSpark; + this.isTez = isTez; } @Override @@ -1444,7 +1241,6 @@ public class Vectorizer implements PhysicalPlanResolver { VectorizationContext vContext = null; - currentOperator = op; if (op instanceof TableScanOperator) { if (taskVectorizationContext == null) { taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo); @@ -1465,7 +1261,7 @@ public class Vectorizer implements PhysicalPlanResolver { + " using vectorization context" + vContext.toString()); } - Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTezOrSpark); + Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez); if (LOG.isDebugEnabled()) { if (vectorOp instanceof VectorizationContextRegion) { @@ -1483,6 +1279,7 @@ public class Vectorizer implements PhysicalPlanResolver { private final VectorTaskColumnInfo vectorTaskColumnInfo; + private final boolean isTez; private Operator<? extends OperatorDesc> rootVectorOp; @@ -1490,11 +1287,13 @@ public class Vectorizer implements PhysicalPlanResolver { return rootVectorOp; } - public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo) { + public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo, + boolean isTez) { - super(vectorTaskColumnInfo, vectorTaskColumnInfo.getNonVectorizedOps()); + super(vectorTaskColumnInfo.getNonVectorizedOps()); this.vectorTaskColumnInfo = vectorTaskColumnInfo; rootVectorOp = null; + this.isTez = isTez; } @Override @@ -1510,7 +1309,6 @@ public class Vectorizer implements PhysicalPlanResolver { boolean saveRootVectorOp = false; - currentOperator = op; if (op.getParentOperators().size() == 0) { LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); @@ -1535,7 +1333,7 @@ public class Vectorizer implements PhysicalPlanResolver { assert vContext != null; LOG.info("ReduceWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString()); - Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, true); + Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez); if (LOG.isDebugEnabled()) { if (vectorOp instanceof VectorizationContextRegion) { @@ -1592,10 +1390,6 @@ public class Vectorizer implements PhysicalPlanResolver { HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE); - isReduceVectorizationEnabled = - HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED); - isSchemaEvolution = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_SCHEMA_EVOLUTION); @@ -1613,32 +1407,18 @@ public class Vectorizer implements PhysicalPlanResolver { return physicalContext; } - private void setOperatorNotSupported(Operator<? extends OperatorDesc> op) { - OperatorDesc desc = op.getConf(); - Annotation note = AnnotationUtils.getAnnotation(desc.getClass(), Explain.class); - if (note != null) { - Explain explainNote = (Explain) note; - setNodeIssue(explainNote.displayName() + " (" + op.getType() + ") not supported"); - } else { - setNodeIssue("Operator " + op.getType() + " not supported"); - } - } - - boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTezOrSpark) { - boolean ret; + boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTez) { + boolean ret = false; switch (op.getType()) { case MAPJOIN: if (op instanceof MapJoinOperator) { ret = validateMapJoinOperator((MapJoinOperator) op); } else if (op instanceof SMBMapJoinOperator) { ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op); - } else { - setOperatorNotSupported(op); - ret = false; } break; case GROUPBY: - ret = validateGroupByOperator((GroupByOperator) op, false, isTezOrSpark); + ret = validateGroupByOperator((GroupByOperator) op, false, isTez); break; case FILTER: ret = validateFilterOperator((FilterOperator) op); @@ -1663,7 +1443,6 @@ public class Vectorizer implements PhysicalPlanResolver { validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op); break; default: - setOperatorNotSupported(op); ret = false; break; } @@ -1671,7 +1450,7 @@ public class Vectorizer implements PhysicalPlanResolver { } boolean validateReduceWorkOperator(Operator<? extends OperatorDesc> op) { - boolean ret; + boolean ret = false; switch (op.getType()) { case MAPJOIN: // Does MAPJOIN actually get planned in Reduce? @@ -1679,9 +1458,6 @@ public class Vectorizer implements PhysicalPlanResolver { ret = validateMapJoinOperator((MapJoinOperator) op); } else if (op instanceof SMBMapJoinOperator) { ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op); - } else { - setOperatorNotSupported(op); - ret = false; } break; case GROUPBY: @@ -1689,7 +1465,6 @@ public class Vectorizer implements PhysicalPlanResolver { HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED)) { ret = validateGroupByOperator((GroupByOperator) op, true, true); } else { - setNodeIssue("Operator " + op.getType() + " not enabled (" + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED.name() + "=true IS false)"); ret = false; } break; @@ -1715,7 +1490,6 @@ public class Vectorizer implements PhysicalPlanResolver { validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op); break; default: - setOperatorNotSupported(op); ret = false; break; } @@ -1738,7 +1512,7 @@ public class Vectorizer implements PhysicalPlanResolver { throws SemanticException { if (op.getType().equals(OperatorType.GROUPBY)) { GroupByDesc desc = (GroupByDesc) op.getConf(); - return !((VectorGroupByDesc) desc.getVectorDesc()).isVectorOutput(); + return !desc.getVectorDesc().isVectorOutput(); } return false; } @@ -1752,7 +1526,6 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean validateTableScanOperator(TableScanOperator op, MapWork mWork) { TableScanDesc desc = op.getConf(); if (desc.isGatherStats()) { - setOperatorIssue("gather stats not supported"); return false; } @@ -1767,21 +1540,25 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean validateMapJoinDesc(MapJoinDesc desc) { byte posBigTable = (byte) desc.getPosBigTable(); List<ExprNodeDesc> filterExprs = desc.getFilters().get(posBigTable); - if (!validateExprNodeDesc(filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER)) { + if (!validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER)) { + LOG.info("Cannot vectorize map work filter expression"); return false; } List<ExprNodeDesc> keyExprs = desc.getKeys().get(posBigTable); - if (!validateExprNodeDesc(keyExprs, "Key")) { + if (!validateExprNodeDesc(keyExprs)) { + LOG.info("Cannot vectorize map work key expression"); return false; } List<ExprNodeDesc> valueExprs = desc.getExprs().get(posBigTable); - if (!validateExprNodeDesc(valueExprs, "Value")) { + if (!validateExprNodeDesc(valueExprs)) { + LOG.info("Cannot vectorize map work value expression"); return false; } Byte[] order = desc.getTagOrder(); Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); List<ExprNodeDesc> smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable); - if (!validateExprNodeDesc(smallTableExprs, "Small Table")) { + if (!validateExprNodeDesc(smallTableExprs)) { + LOG.info("Cannot vectorize map work small table expression"); return false; } return true; @@ -1794,23 +1571,24 @@ public class Vectorizer implements PhysicalPlanResolver { List<ExprNodeDesc> filterExprs = desc.getFilters().get(tag); List<ExprNodeDesc> keyExprs = desc.getKeys().get(tag); List<ExprNodeDesc> valueExprs = desc.getExprs().get(tag); - return validateExprNodeDesc(filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER) && - validateExprNodeDesc(keyExprs, "Key") && validateExprNodeDesc(valueExprs, "Value"); + return validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER) && + validateExprNodeDesc(keyExprs) && validateExprNodeDesc(valueExprs); } private boolean validateReduceSinkOperator(ReduceSinkOperator op) { List<ExprNodeDesc> keyDescs = op.getConf().getKeyCols(); List<ExprNodeDesc> partitionDescs = op.getConf().getPartitionCols(); List<ExprNodeDesc> valueDesc = op.getConf().getValueCols(); - return validateExprNodeDesc(keyDescs, "Key") && validateExprNodeDesc(partitionDescs, "Partition") && - validateExprNodeDesc(valueDesc, "Value"); + return validateExprNodeDesc(keyDescs) && validateExprNodeDesc(partitionDescs) && + validateExprNodeDesc(valueDesc); } private boolean validateSelectOperator(SelectOperator op) { List<ExprNodeDesc> descList = op.getConf().getColList(); for (ExprNodeDesc desc : descList) { - boolean ret = validateExprNodeDesc(desc, "Select"); + boolean ret = validateExprNodeDesc(desc); if (!ret) { + LOG.info("Cannot vectorize select expression: " + desc.toString()); return false; } } @@ -1819,26 +1597,28 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean validateFilterOperator(FilterOperator op) { ExprNodeDesc desc = op.getConf().getPredicate(); - return validateExprNodeDesc(desc, "Predicate", VectorExpressionDescriptor.Mode.FILTER); + return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER); } - private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTezOrSpark) { + private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) { GroupByDesc desc = op.getConf(); + VectorGroupByDesc vectorDesc = desc.getVectorDesc(); if (desc.isGroupingSetsPresent()) { - setOperatorIssue("Grouping sets not supported"); + LOG.info("Grouping sets not supported in vector mode"); return false; } if (desc.pruneGroupingSetId()) { - setOperatorIssue("Pruning grouping set id not supported"); + LOG.info("Pruning grouping set id not supported in vector mode"); return false; } if (desc.getMode() != GroupByDesc.Mode.HASH && desc.isDistinct()) { - setOperatorIssue("DISTINCT not supported"); + LOG.info("DISTINCT not supported in vector mode"); return false; } - boolean ret = validateExprNodeDesc(desc.getKeys(), "Key"); + boolean ret = validateExprNodeDesc(desc.getKeys()); if (!ret) { + LOG.info("Cannot vectorize groupby key expression " + desc.getKeys().toString()); return false; } @@ -1951,9 +1731,6 @@ public class Vectorizer implements PhysicalPlanResolver { // If all the aggregation outputs are primitive, we can output VectorizedRowBatch. // Otherwise, we the rest of the operator tree will be row mode. - VectorGroupByDesc vectorDesc = new VectorGroupByDesc(); - desc.setVectorDesc(vectorDesc); - vectorDesc.setVectorOutput(retPair.right); vectorDesc.setProcessingMode(processingMode); @@ -1968,15 +1745,14 @@ public class Vectorizer implements PhysicalPlanResolver { return true; } - private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, String expressionTitle) { - return validateExprNodeDesc(descs, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION); + private boolean validateExprNodeDesc(List<ExprNodeDesc> descs) { + return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION); } private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, - String expressionTitle, VectorExpressionDescriptor.Mode mode) { for (ExprNodeDesc d : descs) { - boolean ret = validateExprNodeDesc(d, expressionTitle, mode); + boolean ret = validateExprNodeDesc(d, mode); if (!ret) { return false; } @@ -1999,20 +1775,19 @@ public class Vectorizer implements PhysicalPlanResolver { return new Pair<Boolean, Boolean>(true, outputIsPrimitive); } - private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, String expressionTitle, - VectorExpressionDescriptor.Mode mode) { + private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) { if (desc instanceof ExprNodeColumnDesc) { ExprNodeColumnDesc c = (ExprNodeColumnDesc) desc; // Currently, we do not support vectorized virtual columns (see HIVE-5570). if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(c.getColumn())) { - setExpressionIssue(expressionTitle, "Virtual columns not supported (" + c.getColumn() + ")"); + LOG.info("Cannot vectorize virtual column " + c.getColumn()); return false; } } String typeName = desc.getTypeInfo().getTypeName(); boolean ret = validateDataType(typeName, mode); if (!ret) { - setExpressionIssue(expressionTitle, "Data type " + typeName + " of " + desc.toString() + " not supported"); + LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName); return false; } boolean isInExpression = false; @@ -2020,7 +1795,7 @@ public class Vectorizer implements PhysicalPlanResolver { ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc; boolean r = validateGenericUdf(d); if (!r) { - setExpressionIssue(expressionTitle, "UDF " + d + " not supported"); + LOG.info("Cannot vectorize UDF " + d); return false; } GenericUDF genericUDF = d.getGenericUDF(); @@ -2031,14 +1806,14 @@ public class Vectorizer implements PhysicalPlanResolver { && desc.getChildren().get(0).getTypeInfo().getCategory() == Category.STRUCT) { // Don't restrict child expressions for projection. // Always use loose FILTER mode. - if (!validateStructInExpression(desc, expressionTitle, VectorExpressionDescriptor.Mode.FILTER)) { + if (!validateStructInExpression(desc, VectorExpressionDescriptor.Mode.FILTER)) { return false; } } else { for (ExprNodeDesc d : desc.getChildren()) { // Don't restrict child expressions for projection. // Always use loose FILTER mode. - if (!validateExprNodeDescRecursive(d, expressionTitle, VectorExpressionDescriptor.Mode.FILTER)) { + if (!validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER)) { return false; } } @@ -2048,7 +1823,7 @@ public class Vectorizer implements PhysicalPlanResolver { } private boolean validateStructInExpression(ExprNodeDesc desc, - String expressionTitle, VectorExpressionDescriptor.Mode mode) { + VectorExpressionDescriptor.Mode mode) { for (ExprNodeDesc d : desc.getChildren()) { TypeInfo typeInfo = d.getTypeInfo(); if (typeInfo.getCategory() != Category.STRUCT) { @@ -2064,8 +1839,7 @@ public class Vectorizer implements PhysicalPlanResolver { TypeInfo fieldTypeInfo = fieldTypeInfos.get(f); Category category = fieldTypeInfo.getCategory(); if (category != Category.PRIMITIVE) { - setExpressionIssue(expressionTitle, - "Cannot vectorize struct field " + fieldNames.get(f) + LOG.info("Cannot vectorize struct field " + fieldNames.get(f) + " of type " + fieldTypeInfo.getTypeName()); return false; } @@ -2078,8 +1852,7 @@ public class Vectorizer implements PhysicalPlanResolver { if (inConstantType != InConstantType.INT_FAMILY && inConstantType != InConstantType.FLOAT_FAMILY && inConstantType != InConstantType.STRING_FAMILY) { - setExpressionIssue(expressionTitle, - "Cannot vectorize struct field " + fieldNames.get(f) + LOG.info("Cannot vectorize struct field " + fieldNames.get(f) + " of type " + fieldTypeInfo.getTypeName()); return false; } @@ -2088,28 +1861,31 @@ public class Vectorizer implements PhysicalPlanResolver { return true; } - private boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle) { - return validateExprNodeDesc(desc, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION); + private boolean validateExprNodeDesc(ExprNodeDesc desc) { + return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.PROJECTION); } - boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle, - VectorExpressionDescriptor.Mode mode) { - if (!validateExprNodeDescRecursive(desc, expressionTitle, mode)) { + boolean validateExprNodeDesc(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) { + if (!validateExprNodeDescRecursive(desc, mode)) { return false; } try { VectorizationContext vc = new ValidatorVectorizationContext(hiveConf); if (vc.getVectorExpression(desc, mode) == null) { // TODO: this cannot happen - VectorizationContext throws in such cases. - setExpressionIssue(expressionTitle, "getVectorExpression returned null"); + LOG.info("getVectorExpression returned null"); return false; } } catch (Exception e) { if (e instanceof HiveException) { - setExpressionIssue(expressionTitle, e.getMessage()); + LOG.info(e.getMessage()); } else { - String issue = "exception: " + VectorizationContext.getStackTraceAsSingleLine(e); - setExpressionIssue(expressionTitle, issue); + if (LOG.isDebugEnabled()) { + // Show stack trace. + LOG.debug("Failed to vectorize", e); + } else { + LOG.info("Failed to vectorize", e.getMessage()); + } } return false; } @@ -2129,9 +1905,9 @@ public class Vectorizer implements PhysicalPlanResolver { } } - public static ObjectInspector.Category aggregationOutputCategory(VectorAggregateExpression vectorAggrExpr) { + private boolean validateAggregationIsPrimitive(VectorAggregateExpression vectorAggrExpr) { ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector(); - return outputObjInspector.getCategory(); + return (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE); } private Pair<Boolean,Boolean> validateAggregationDesc(AggregationDesc aggDesc, ProcessingMode processingMode, @@ -2139,10 +1915,11 @@ public class Vectorizer implements PhysicalPlanResolver { String udfName = aggDesc.getGenericUDAFName().toLowerCase(); if (!supportedAggregationUdfs.contains(udfName)) { - setExpressionIssue("Aggregation Function", "UDF " + udfName + " not supported"); + LOG.info("Cannot vectorize groupby aggregate expression: UDF " + udfName + " not supported"); return new Pair<Boolean,Boolean>(false, false); } - if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters(), "Aggregation Function UDF " + udfName + " parameter")) { + if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters())) { + LOG.info("Cannot vectorize groupby aggregate expression: UDF parameters not supported"); return new Pair<Boolean,Boolean>(false, false); } @@ -2156,7 +1933,6 @@ public class Vectorizer implements PhysicalPlanResolver { if (LOG.isDebugEnabled()) { LOG.debug("Vectorization of aggreation should have succeeded ", e); } - setExpressionIssue("Aggregation Function", "Vectorization of aggreation should have succeeded " + e); return new Pair<Boolean,Boolean>(false, false); } if (LOG.isDebugEnabled()) { @@ -2164,12 +1940,11 @@ public class Vectorizer implements PhysicalPlanResolver { " vector expression " + vectorAggrExpr.toString()); } - ObjectInspector.Category outputCategory = aggregationOutputCategory(vectorAggrExpr); - boolean outputIsPrimitive = (outputCategory == ObjectInspector.Category.PRIMITIVE); + boolean outputIsPrimitive = validateAggregationIsPrimitive(vectorAggrExpr); if (processingMode == ProcessingMode.MERGE_PARTIAL && hasKeys && !outputIsPrimitive) { - setOperatorIssue("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types"); + LOG.info("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types"); return new Pair<Boolean,Boolean>(false, false); } @@ -2237,12 +2012,12 @@ public class Vectorizer implements PhysicalPlanResolver { if (smallTableIndices[i] < 0) { // Negative numbers indicate a column to be (deserialize) read from the small table's // LazyBinary value row. - setOperatorIssue("Vectorizer isBigTableOnlyResults smallTableIndices[i] < 0 returning false"); + LOG.info("Vectorizer isBigTableOnlyResults smallTableIndices[i] < 0 returning false"); return false; } } } else if (smallTableRetainSize > 0) { - setOperatorIssue("Vectorizer isBigTableOnlyResults smallTableRetainSize > 0 returning false"); + LOG.info("Vectorizer isBigTableOnlyResults smallTableRetainSize > 0 returning false"); return false; } @@ -2251,21 +2026,20 @@ public class Vectorizer implements PhysicalPlanResolver { } Operator<? extends OperatorDesc> specializeMapJoinOperator(Operator<? extends OperatorDesc> op, - VectorizationContext vContext, MapJoinDesc desc, VectorMapJoinInfo vectorMapJoinInfo) - throws HiveException { + VectorizationContext vContext, MapJoinDesc desc) throws HiveException { Operator<? extends OperatorDesc> vectorOp = null; Class<? extends Operator<?>> opClass = null; - VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) desc.getVectorDesc(); - - HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE; - HashTableKind hashTableKind = HashTableKind.NONE; - HashTableKeyType hashTableKeyType = HashTableKeyType.NONE; - OperatorVariation operatorVariation = OperatorVariation.NONE; + VectorMapJoinDesc.HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE; + VectorMapJoinDesc.HashTableKind hashTableKind = HashTableKind.NONE; + VectorMapJoinDesc.HashTableKeyType hashTableKeyType = HashTableKeyType.NONE; - if (vectorDesc.getIsFastHashTableEnabled()) { + if (HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) { hashTableImplementationType = HashTableImplementationType.FAST; } else { + // Restrict to using BytesBytesMultiHashMap via MapJoinBytesTableContainer or + // HybridHashTableContainer. hashTableImplementationType = HashTableImplementationType.OPTIMIZED; } @@ -2287,31 +2061,20 @@ public class Vectorizer implements PhysicalPlanResolver { Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys(); List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable); if (bigTableKeyExprs.size() == 1) { - TypeInfo typeInfo = bigTableKeyExprs.get(0).getTypeInfo(); - LOG.info("Vectorizer vectorizeOperator map join typeName " + typeInfo.getTypeName()); - switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) { - case BOOLEAN: + String typeName = bigTableKeyExprs.get(0).getTypeString(); + LOG.info("Vectorizer vectorizeOperator map join typeName " + typeName); + if (typeName.equals("boolean")) { hashTableKeyType = HashTableKeyType.BOOLEAN; - break; - case BYTE: + } else if (typeName.equals("tinyint")) { hashTableKeyType = HashTableKeyType.BYTE; - break; - case SHORT: + } else if (typeName.equals("smallint")) { hashTableKeyType = HashTableKeyType.SHORT; - break; - case INT: + } else if (typeName.equals("int")) { hashTableKeyType = HashTableKeyType.INT; - break; - case LONG: + } else if (typeName.equals("bigint") || typeName.equals("long")) { hashTableKeyType = HashTableKeyType.LONG; - break; - case STRING: - case CHAR: - case VARCHAR: - case BINARY: + } else if (VectorizationContext.isStringFamily(typeName)) { hashTableKeyType = HashTableKeyType.STRING; - default: - // Stay with multi-key. } } } @@ -2319,20 +2082,16 @@ public class Vectorizer implements PhysicalPlanResolver { switch (joinType) { case JoinDesc.INNER_JOIN: if (!isInnerBigOnly) { - operatorVariation = OperatorVariation.INNER; hashTableKind = HashTableKind.HASH_MAP; } else { - operatorVariation = OperatorVariation.INNER_BIG_ONLY; hashTableKind = HashTableKind.HASH_MULTISET; } break; case JoinDesc.LEFT_OUTER_JOIN: case JoinDesc.RIGHT_OUTER_JOIN: - operatorVariation = OperatorVariation.OUTER; hashTableKind = HashTableKind.HASH_MAP; break; case JoinDesc.LEFT_SEMI_JOIN: - operatorVariation = OperatorVariation.LEFT_SEMI; hashTableKind = HashTableKind.HASH_SET; break; default: @@ -2347,84 +2106,86 @@ public class Vectorizer implements PhysicalPlanResolver { case SHORT: case INT: case LONG: - switch (operatorVariation) { - case INNER: - opClass = VectorMapJoinInnerLongOperator.class; + switch (joinType) { + case JoinDesc.INNER_JOIN: + if (!isInnerBigOnly) { + opClass = VectorMapJoinInnerLongOperator.class; + } else { + opClass = VectorMapJoinInnerBigOnlyLongOperator.class; + } break; - case INNER_BIG_ONLY: - opClass = VectorMapJoinInnerBigOnlyLongOperator.class; + case JoinDesc.LEFT_OUTER_JOIN: + case JoinDesc.RIGHT_OUTER_JOIN: + opClass = VectorMapJoinOuterLongOperator.class; break; - case LEFT_SEMI: + case JoinDesc.LEFT_SEMI_JOIN: opClass = VectorMapJoinLeftSemiLongOperator.class; break; - case OUTER: - opClass = VectorMapJoinOuterLongOperator.class; - break; default: - throw new HiveException("Unknown operator variation " + operatorVariation); + throw new HiveException("Unknown join type " + joinType); } break; case STRING: - switch (operatorVariation) { - case INNER: - opClass = VectorMapJoinInnerStringOperator.class; + switch (joinType) { + case JoinDesc.INNER_JOIN: + if (!isInnerBigOnly) { + opClass = VectorMapJoinInnerStringOperator.class; + } else { + opClass = VectorMapJoinInnerBigOnlyStringOperator.class; + } break; - case INNER_BIG_ONLY: - opClass = VectorMapJoinInnerBigOnlyStringOperator.class; + case JoinDesc.LEFT_OUTER_JOIN: + case JoinDesc.RIGHT_OUTER_JOIN: + opClass = VectorMapJoinOuterStringOperator.class; break; - case LEFT_SEMI: + case JoinDesc.LEFT_SEMI_JOIN: opClass = VectorMapJoinLeftSemiStringOperator.class; break; - case OUTER: - opClass = VectorMapJoinOuterStringOperator.class; - break; default: - throw new HiveException("Unknown operator variation " + operatorVariation); + throw new HiveException("Unknown join type " + joinType); } break; case MULTI_KEY: - switch (operatorVariation) { - case INNER: - opClass = VectorMapJoinInnerMultiKeyOperator.class; + switch (joinType) { + case JoinDesc.INNER_JOIN: + if (!isInnerBigOnly) { + opClass = VectorMapJoinInnerMultiKeyOperator.class; + } else { + opClass = VectorMapJoinInnerBigOnlyMultiKeyOperator.class; + } break; - case INNER_BIG_ONLY: - opClass = VectorMapJoinInnerBigOnlyMultiKeyOperator.class; + case JoinDesc.LEFT_OUTER_JOIN: + case JoinDesc.RIGHT_OUTER_JOIN: + opClass = VectorMapJoinOuterMultiKeyOperator.class; break; - case LEFT_SEMI: + case JoinDesc.LEFT_SEMI_JOIN: opClass = VectorMapJoinLeftSemiMultiKeyOperator.class; break; - case OUTER: - opClass = VectorMapJoinOuterMultiKeyOperator.class; - break; default: - throw new HiveException("Unknown operator variation " + operatorVariation); + throw new HiveException("Unknown join type " + joinType); } break; - default: - throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType.name()); } + vectorOp = OperatorFactory.getVectorOperator( + opClass, op.getCompilationOpContext(), op.getConf(), vContext); + LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName()); + boolean minMaxEnabled = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_MINMAX_ENABLED); + VectorMapJoinDesc vectorDesc = desc.getVectorDesc(); vectorDesc.setHashTableImplementationType(hashTableImplementationType); vectorDesc.setHashTableKind(hashTableKind); vectorDesc.setHashTableKeyType(hashTableKeyType); - vectorDesc.setOperatorVariation(operatorVariation); vectorDesc.setMinMaxEnabled(minMaxEnabled); - vectorDesc.setVectorMapJoinInfo(vectorMapJoinInfo); - - vectorOp = OperatorFactory.getVectorOperator( - opClass, op.getCompilationOpContext(), op.getConf(), vContext); - LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName()); - return vectorOp; } - public static boolean onExpressionHasNullSafes(MapJoinDesc desc) { + private boolean onExpressionHasNullSafes(MapJoinDesc desc) { boolean[] nullSafes = desc.getNullSafes(); if (nullSafes == null) { - return false; + return false; } for (boolean nullSafe : nullSafes) { if (nullSafe) { @@ -2435,372 +2196,53 @@ public class Vectorizer implements PhysicalPlanResolver { } private boolean canSpecializeMapJoin(Operator<? extends OperatorDesc> op, MapJoinDesc desc, - boolean isTezOrSpark, VectorizationContext vContext, VectorMapJoinInfo vectorMapJoinInfo) - throws HiveException { - - Preconditions.checkState(op instanceof MapJoinOperator); - - // Allocate a VectorReduceSinkDesc initially with implementation type NONE so EXPLAIN - // can report this operator was vectorized, but not native. And, the conditions. - VectorMapJoinDesc vectorDesc = new VectorMapJoinDesc(); - desc.setVectorDesc(vectorDesc); - - boolean isVectorizationMapJoinNativeEnabled = HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED); - - String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); - - boolean oneMapJoinCondition = (desc.getConds().length == 1); - - boolean hasNullSafes = onExpressionHasNullSafes(desc); - - byte posBigTable = (byte) desc.getPosBigTable(); - - // Since we want to display all the met and not met conditions in EXPLAIN, we determine all - // information first.... - - List<ExprNodeDesc> keyDesc = desc.getKeys().get(posBigTable); - VectorExpression[] allBigTableKeyExpressions = vContext.getVectorExpressions(keyDesc); - final int allBigTableKeyExpressionsLength = allBigTableKeyExpressions.length; - boolean isEmptyKey = (allBigTableKeyExpressionsLength == 0); + boolean isTez) { - boolean supportsKeyTypes = true; // Assume. - HashSet<String> notSupportedKeyTypes = new HashSet<String>(); + boolean specialize = false; - // Since a key expression can be a calculation and the key will go into a scratch column, - // we need the mapping and type information. - int[] bigTableKeyColumnMap = new int[allBigTableKeyExpressionsLength]; - String[] bigTableKeyColumnNames = new String[allBigTableKeyExpressionsLength]; - TypeInfo[] bigTableKeyTypeInfos = new TypeInfo[allBigTableKeyExpressionsLength]; - ArrayList<VectorExpression> bigTableKeyExpressionsList = new ArrayList<VectorExpression>(); - VectorExpression[] bigTableKeyExpressions; - for (int i = 0; i < allBigTableKeyExpressionsLength; i++) { - VectorExpression ve = allBigTableKeyExpressions[i]; - if (!IdentityExpression.isColumnOnly(ve)) { - bigTableKeyExpressionsList.add(ve); - } - bigTableKeyColumnMap[i] = ve.getOutputColumn(); - - ExprNodeDesc exprNode = keyDesc.get(i); - bigTableKeyColumnNames[i] = exprNode.toString(); - - TypeInfo typeInfo = exprNode.getTypeInfo(); - // Verify we handle the key column types for an optimized table. This is the effectively the - // same check used in HashTableLoader. - if (!MapJoinKey.isSupportedField(typeInfo)) { - supportsKeyTypes = false; - Category category = typeInfo.getCategory(); - notSupportedKeyTypes.add( - (category != Category.PRIMITIVE ? category.toString() : - ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory().toString())); - } - bigTableKeyTypeInfos[i] = typeInfo; - } - if (bigTableKeyExpressionsList.size() == 0) { - bigTableKeyExpressions = null; - } else { - bigTableKeyExpressions = bigTableKeyExpressionsList.toArray(new VectorExpression[0]); - } - - List<ExprNodeDesc> bigTableExprs = desc.getExprs().get(posBigTable); - VectorExpression[] allBigTableValueExpressions = vContext.getVectorExpressions(bigTableExprs); - - boolean isFastHashTableEnabled = + if (op instanceof MapJoinOperator && HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED); - vectorDesc.setIsFastHashTableEnabled(isFastHashTableEnabled); + HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED)) { - // Especially since LLAP is prone to turn it off in the MapJoinDesc in later - // physical optimizer stages... - boolean isHybridHashJoin = desc.isHybridHashJoin(); - vectorDesc.setIsHybridHashJoin(isHybridHashJoin); + // Currently, only under Tez and non-N-way joins. + if (isTez && desc.getConds().length == 1 && !onExpressionHasNullSafes(desc)) { - /* - * Populate vectorMapJoininfo. - */ + // Ok, all basic restrictions satisfied so far... + specialize = true; - /* - * Similarly, we need a mapping since a value expression can be a calculation and the value - * will go into a scratch column. - */ - int[] bigTableValueColumnMap = new int[allBigTableValueExpressions.length]; - String[] bigTableValueColumnNames = new String[allBigTableValueExpressions.length]; - TypeInfo[] bigTableValueTypeInfos = new TypeInfo[allBigTableValueExpressions.length]; - ArrayList<VectorExpression> bigTableValueExpressionsList = new ArrayList<VectorExpression>(); - VectorExpression[] bigTableValueExpressions; - for (int i = 0; i < bigTableValueColumnMap.length; i++) { - VectorExpression ve = allBigTableValueExpressions[i]; - if (!IdentityExpression.isColumnOnly(ve)) { - bigTableValueExpressionsList.add(ve); - } - bigTableValueColumnMap[i] = ve.getOutputColumn(); + if (!HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) { - ExprNodeDesc exprNode = bigTableExprs.get(i); - bigTableValueColumnNames[i] = exprNode.toString(); - bigTableValueTypeInfos[i] = exprNode.getTypeInfo(); - } - if (bigTableValueExpressionsList.size() == 0) { - bigTableValueExpressions = null; - } else { - bigTableValueExpressions = bigTableValueExpressionsList.toArray(new VectorExpression[0]); - } - - vectorMapJoinInfo.setBigTableKeyColumnMap(bigTableKeyColumnMap); - vectorMapJoinInfo.setBigTableKeyColumnNames(bigTableKeyColumnNames); - vectorMapJoinInfo.setBigTableKeyTypeInfos(bigTableKeyTypeInfos); - vectorMapJoinInfo.setBigTableKeyExpressions(bigTableKeyExpressions); - - vectorMapJoinInfo.setBigTableValueColumnMap(bigTableValueColumnMap); - vectorMapJoinInfo.setBigTableValueColumnNames(bigTableValueColumnNames); - vectorMapJoinInfo.setBigTableValueTypeInfos(bigTableValueTypeInfos); - vectorMapJoinInfo.setBigTableValueExpressions(bigTableValueExpressions); + // We are using the optimized hash table we have further + // restrictions (using optimized and key type). - /* - * Small table information. - */ - VectorColumnOutputMapping bigTableRetainedMapping = - new VectorColumnOutputMapping("Big Table Retained Mapping"); - - VectorColumnOutputMapping bigTableOuterKeyMapping = - new VectorColumnOutputMapping("Big Table Outer Key Mapping"); - - // The order of the fields in the LazyBinary small table value must be used, so - // we use the source ordering flavor for the mapping. - VectorColumnSourceMapping smallTableMapping = - new VectorColumnSourceMapping("Small Table Mapping"); - - Byte[] order = desc.getTagOrder(); - Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); - boolean isOuterJoin = !desc.getNoOuterJoin(); - - /* - * Gather up big and small table output result information from the MapJoinDesc. - */ - List<Integer> bigTableRetainList = desc.getRetainList().get(posBigTable); - int bigTableRetainSize = bigTableRetainList.size(); - - int[] smallTableIndices; - int smallTableIndicesSize; - List<ExprNodeDesc> smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable); - if (desc.getValueIndices() != null && desc.getValueIndices().get(posSingleVectorMapJoinSmallTable) != null) { - smallTableIndices = desc.getValueIndices().get(posSingleVectorMapJoinSmallTable); - smallTableIndicesSize = smallTableIndices.length; - } else { - smallTableIndices = null; - smallTableIndicesSize = 0; - } - - List<Integer> smallTableRetainList = desc.getRetainList().get(posSingleVectorMapJoinSmallTable); - int smallTableRetainSize = smallTableRetainList.size(); - - int smallTableResultSize = 0; - if (smallTableIndicesSize > 0) { - smallTableResultSize = smallTableIndicesSize; - } else if (smallTableRetainSize > 0) { - smallTableResultSize = smallTableRetainSize; - } - - /* - * Determine the big table retained mapping first so we can optimize out (with - * projection) copying inner join big table keys in the subsequent small table results section. - */ - - // We use a mapping object here so we can build the projection in any order and - // get the ordered by 0 to n-1 output columns at the end. - // - // Also, to avoid copying a big table key into the small table result area for inner joins, - // we reference it with the projection so there can be duplicate output columns - // in the projection. - VectorColumnSourceMapping projectionMapping = new VectorColumnSourceMapping("Projection Mapping"); - - int nextOutputColumn = (order[0] == posBigTable ? 0 : smallTableResultSize); - for (int i = 0; i < bigTableRetainSize; i++) { - - // Since bigTableValueExpressions may do a calculation and produce a scratch column, we - // need to map to the right batch column. - - int retainColumn = bigTableRetainList.get(i); - int batchColumnIndex = bigTableValueColumnMap[retainColumn]; - TypeInfo typeInfo = bigTableValueTypeInfos[i]; - - // With this map we project the big table batch to make it look like an output batch. - projectionMapping.add(nextOutputColumn, batchColumnIndex, typeInfo); - - // Collect columns we copy from the big table batch to the overflow batch. - if (!bigTableRetainedMapping.containsOutputColumn(batchColumnIndex)) { - // Tolerate repeated use of a big table column. - bigTableRetainedMapping.add(batchColumnIndex, batchColumnIndex, typeInfo); - } - - nextOutputColumn++; - } - - /* - * Now determine the small table results. - */ - boolean smallTableExprVectorizes = true; - - int firstSmallTableOutputColumn; - firstSmallTableOutputColumn = (order[0] == posBigTable ? bigTableRetainSize : 0); - int smallTableOutputCount = 0; - nextOutputColumn = firstSmallTableOutputColumn; - - // Small table indices has more information (i.e. keys) than retain, so use it if it exists... - String[] bigTableRetainedNames; - if (smallTableIndicesSize > 0) { - smallTableOutputCount = smallTableIndicesSize; - bigTableRetainedNames = new String[smallTableOutputCount]; - - for (int i = 0; i < smallTableIndicesSize; i++) { - if (smallTableIndices[i] >= 0) { - - // Zero and above numbers indicate a big table key is needed for - // small table result "area". - - int keyIndex = smallTableIndices[i]; - - // Since bigTableKeyExpressions may do a calculation and produce a scratch column, we - // need to map the right column. - int batchKeyColumn = bigTableKeyColumnMap[keyIndex]; - bigTableRetainedNames[i] = bigTableKeyColumnNames[keyIndex]; - TypeInfo typeInfo = bigTableKeyTypeInfos[keyIndex]; - - if (!isOuterJoin) { - - // Optimize inner join keys of small table results. - - // Project the big table key into the small table result "area". - projectionMapping.add(nextOutputColumn, batchKeyColumn, typeInfo); - - if (!bigTableRetainedMapping.containsOutputColumn(batchKeyColumn)) { - // If necessary, copy the big table key into the overflow batch's small table - // result "area". - bigTableRetainedMapping.add(batchKeyColumn, batchKeyColumn, typeInfo); - } + if (!HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE)) { + specialize = false; } else { - - // For outer joins, since the small table key can be null when there is no match, - // we must have a physical (scratch) column for those keys. We cannot use the - // projection optimization used by inner joins above. - - int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName()); - projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo); - - bigTableRetainedMapping.add(batchKeyColumn, scratchColumn, typeInfo); - - bigTableOuterKeyMapping.add(batchKeyColumn, scratchColumn, typeInfo); + byte posBigTable = (byte) desc.getPosBigTable(); + Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys(); + List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable); + for (ExprNodeDesc exprNodeDesc : bigTableKeyExprs) { + String typeName = exprNodeDesc.getTypeString(); + if (!MapJoinKey.isSupportedField(typeName)) { + specialize = false; + break; + } + } } } else { - // Negative numbers indicate a column to be (deserialize) read from the small table's - // LazyBinary value row. - int smallTableValueIndex = -smallTableIndices[i] - 1; + // With the fast hash table implementation, we currently do not support + // Hybrid Grace Hash Join. - ExprNodeDesc smallTableExprNode = smallTableExprs.get(i); - if (!validateExprNodeDesc(smallTableExprNode, "Small Table")) { - clearNotVectorizedReason(); - smallTableExprVectorizes = false; + if (desc.isHybridHashJoin()) { + specialize = false; } - - bigTableRetainedNames[i] = smallTableExprNode.toString(); - - TypeInfo typeInfo = smallTableExprNode.getTypeInfo(); - - // Make a new big table scratch column for the small table value. - int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName()); - projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo); - - smallTableMapping.add(smallTableValueIndex, scratchColumn, typeInfo); - } - nextOutputColumn++; - } - } else if (smallTableRetainSize > 0) { - smallTableOutputCount = smallTableRetainSize; - bigTableRetainedNames = new String[smallTableOutputCount]; - - // Only small table values appear in join output result. - - for (int i = 0; i < smallTableRetainSize; i++) { - int smallTableValueIndex = smallTableRetainList.get(i); - - ExprNodeDesc smallTableExprNode = smallTableExprs.get(i); - if (!validateExprNodeDesc(smallTableExprNode, "Small Table")) { - clearNotVectorizedReason(); - smallTableExprVectorizes = false; } - - bigTableRetainedNames[i] = smallTableExprNode.toString(); - - // Make a new big table scratch column for the small table value. - TypeInfo typeInfo = smallTableExprNode.getTypeInfo(); - int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName()); - - projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo); - - smallTableMapping.add(smallTableValueIndex, scratchColumn, typeInfo); - nextOutputColumn++; } - } else { - bigTableRetainedNames = new String[0]; - } - - // Remember the condition variables for EXPLAIN regardless. - vectorDesc.setIsVectorizationMapJoinNativeEnabled(isVectorizationMapJoinNativeEnabled); - vectorDesc.setEngine(engine); - vectorDesc.setOneMapJoinCondition(oneMapJoinCondition); - vectorDesc.setHasNullSafes(hasNullSafes); - vectorDesc.setSupportsKeyTypes(supportsKeyTypes); - if (!supportsKeyTypes) { - vectorDesc.setNotSupportedKeyTypes(new ArrayList(notSupportedKeyTypes)); - } - vectorDesc.setIsEmptyKey(isEmptyKey); - vectorDesc.setSmallTableExprVectorizes(smallTableExprVectorizes); - - // Currently, only under Tez and non-N-way joins. -
<TRUNCATED>