Repository: hive
Updated Branches:
  refs/heads/master fe59851c8 -> 160651e30


HIVE-17348: Remove unnecessary GenSparkUtils.java.orig file (Peter Vary, 
reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/160651e3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/160651e3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/160651e3

Branch: refs/heads/master
Commit: 160651e3067edc5d66fd6ad43be56082111e1393
Parents: fe59851
Author: Peter Vary <pv...@cloudera.com>
Authored: Tue Aug 22 16:17:04 2017 +0200
Committer: Peter Vary <pv...@cloudera.com>
Committed: Tue Aug 22 16:17:04 2017 +0200

----------------------------------------------------------------------
 .../hive/ql/parse/spark/GenSparkUtils.java.orig | 682 -------------------
 1 file changed, 682 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/160651e3/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java.orig
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java.orig 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java.orig
deleted file mode 100644
index 4f826a1..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java.orig
+++ /dev/null
@@ -1,682 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.parse.spark;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
-import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.ForwardOperator;
-import org.apache.hadoop.hive.ql.exec.GroupByOperator;
-import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
-import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
-import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-
-/**
- * GenSparkUtils is a collection of shared helper methods to produce SparkWork
- * Cloned from GenTezUtils.
- */
-public class GenSparkUtils {
-  private static final Logger LOG = 
LoggerFactory.getLogger(GenSparkUtils.class.getName());
-
-  // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...)
-  private int sequenceNumber = 0;
-
-  // singleton
-  private static GenSparkUtils utils;
-
-  public static GenSparkUtils getUtils() {
-    if (utils == null) {
-      utils = new GenSparkUtils();
-    }
-    return utils;
-  }
-
-  protected GenSparkUtils() {
-  }
-
-  public void resetSequenceNumber() {
-    sequenceNumber = 0;
-  }
-
-  public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> 
root,
-    SparkWork sparkWork) throws SemanticException {
-    Preconditions.checkArgument(!root.getParentOperators().isEmpty(),
-        "AssertionError: expected root.getParentOperators() to be non-empty");
-
-    ReduceWork reduceWork = new ReduceWork("Reducer " + (++sequenceNumber));
-    LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
-    reduceWork.setReducer(root);
-    reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
-
-    // Pick the maximum # reducers across all parents as the # of reduce tasks.
-    int maxExecutors = -1;
-    for (Operator<? extends OperatorDesc> parentOfRoot : 
root.getParentOperators()) {
-      Preconditions.checkArgument(parentOfRoot instanceof ReduceSinkOperator,
-          "AssertionError: expected parentOfRoot to be an "
-              + "instance of ReduceSinkOperator, but was "
-              + parentOfRoot.getClass().getName());
-      ReduceSinkOperator reduceSink = (ReduceSinkOperator) parentOfRoot;
-      maxExecutors = Math.max(maxExecutors, 
reduceSink.getConf().getNumReducers());
-    }
-    reduceWork.setNumReduceTasks(maxExecutors);
-
-    ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
-    setupReduceSink(context, reduceWork, reduceSink);
-    sparkWork.add(reduceWork);
-    SparkEdgeProperty edgeProp = getEdgeProperty(context.conf, reduceSink, 
reduceWork);
-
-    sparkWork.connect(context.preceedingWork, reduceWork, edgeProp);
-
-    return reduceWork;
-  }
-
-  protected void setupReduceSink(GenSparkProcContext context, ReduceWork 
reduceWork,
-      ReduceSinkOperator reduceSink) {
-
-    LOG.debug("Setting up reduce sink: " + reduceSink
-        + " with following reduce work: " + reduceWork.getName());
-
-    // need to fill in information about the key and value in the reducer
-    GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
-
-    // remember which parent belongs to which tag
-    reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
-        context.preceedingWork.getName());
-
-    // remember the output name of the reduce sink
-    reduceSink.getConf().setOutputName(reduceWork.getName());
-  }
-
-  public MapWork createMapWork(GenSparkProcContext context, Operator<?> root,
-    SparkWork sparkWork, PrunedPartitionList partitions) throws 
SemanticException {
-    return createMapWork(context, root, sparkWork, partitions, false);
-  }
-
-  public MapWork createMapWork(GenSparkProcContext context, Operator<?> root,
-      SparkWork sparkWork, PrunedPartitionList partitions, boolean deferSetup) 
throws SemanticException {
-    Preconditions.checkArgument(root.getParentOperators().isEmpty(),
-        "AssertionError: expected root.getParentOperators() to be empty");
-    MapWork mapWork = new MapWork("Map " + (++sequenceNumber));
-    LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
-
-    // map work starts with table scan operators
-    Preconditions.checkArgument(root instanceof TableScanOperator,
-      "AssertionError: expected root to be an instance of TableScanOperator, 
but was "
-      + root.getClass().getName());
-    String alias_id = null;
-    if (context.parseContext != null && context.parseContext.getTopOps() != 
null) {
-      for (String currentAliasID : context.parseContext.getTopOps().keySet()) {
-        Operator<? extends OperatorDesc> currOp = 
context.parseContext.getTopOps().get(currentAliasID);
-        if (currOp == root) {
-          alias_id = currentAliasID;
-          break;
-        }
-      }
-    }
-    if (alias_id == null)
-      alias_id = ((TableScanOperator) root).getConf().getAlias();
-    if (!deferSetup) {
-      setupMapWork(mapWork, context, partitions,(TableScanOperator) root, 
alias_id);
-    }
-
-    // add new item to the Spark work
-    sparkWork.add(mapWork);
-
-    return mapWork;
-  }
-
-  // this method's main use is to help unit testing this class
-  protected void setupMapWork(MapWork mapWork, GenSparkProcContext context,
-      PrunedPartitionList partitions, TableScanOperator root,
-      String alias_id) throws SemanticException {
-    // All the setup is done in GenMapRedUtils
-    GenMapRedUtils.setMapWork(mapWork, context.parseContext,
-        context.inputs, partitions, root, alias_id, context.conf, false);
-  }
-
-  private void collectOperators(Operator<?> op, List<Operator<?>> opList) {
-    opList.add(op);
-    for (Object child : op.getChildOperators()) {
-      if (child != null) {
-        collectOperators((Operator<?>) child, opList);
-      }
-    }
-  }
-
-  // removes any union operator and clones the plan
-  public void removeUnionOperators(GenSparkProcContext context, BaseWork work)
-    throws SemanticException {
-
-    List<Operator<?>> roots = new ArrayList<Operator<?>>();
-
-    // For MapWork, getAllRootOperators is not suitable, since it checks
-    // getPathToAliases, and will return null if this is empty. Here we are
-    // replacing getAliasToWork, so should use that information instead.
-    if (work instanceof MapWork) {
-      roots.addAll(((MapWork) work).getAliasToWork().values());
-    } else {
-      roots.addAll(work.getAllRootOperators());
-    }
-    if (work.getDummyOps() != null) {
-      roots.addAll(work.getDummyOps());
-    }
-
-    // need to clone the plan.
-    List<Operator<?>> newRoots = 
SerializationUtilities.cloneOperatorTree(roots);
-
-    // Build a map to map the original FileSinkOperator and the cloned 
FileSinkOperators
-    // This map is used for set the stats flag for the cloned 
FileSinkOperators in later process
-    Iterator<Operator<?>> newRootsIt = newRoots.iterator();
-    for (Operator<?> root : roots) {
-      Operator<?> newRoot = newRootsIt.next();
-      List<Operator<?>> newOpQueue = new LinkedList<Operator<?>>();
-      collectOperators(newRoot, newOpQueue);
-      List<Operator<?>> opQueue = new LinkedList<Operator<?>>();
-      collectOperators(root, opQueue);
-      Iterator<Operator<?>> newOpQueueIt = newOpQueue.iterator();
-      for (Operator<?> op : opQueue) {
-        Operator<?> newOp = newOpQueueIt.next();
-
-        // We need to update rootToWorkMap in case the op is a key, since even
-        // though we clone the op tree, we're still using the same 
MapWork/ReduceWork.
-        if (context.rootToWorkMap.containsKey(op)) {
-          context.rootToWorkMap.put(newOp, context.rootToWorkMap.get(op));
-        }
-        // Don't remove the old entry - in SparkPartitionPruningSink it still
-        // refers to the old TS, and we need to lookup it later in
-        // processPartitionPruningSink.
-
-        if (op instanceof FileSinkOperator) {
-          List<FileSinkOperator> fileSinkList = context.fileSinkMap.get(op);
-          if (fileSinkList == null) {
-            fileSinkList = new LinkedList<FileSinkOperator>();
-          }
-          fileSinkList.add((FileSinkOperator) newOp);
-          context.fileSinkMap.put((FileSinkOperator) op, fileSinkList);
-        } else if (op instanceof SparkPartitionPruningSinkOperator) {
-          SparkPartitionPruningSinkOperator oldPruningSink = 
(SparkPartitionPruningSinkOperator) op;
-          SparkPartitionPruningSinkOperator newPruningSink = 
(SparkPartitionPruningSinkOperator) newOp;
-          
newPruningSink.getConf().setTableScan(oldPruningSink.getConf().getTableScan());
-          context.pruningSinkSet.add(newPruningSink);
-          context.pruningSinkSet.remove(oldPruningSink);
-        }
-      }
-    }
-
-    // we're cloning the operator plan but we're retaining the original work. 
That means
-    // that root operators have to be replaced with the cloned ops. The 
replacement map
-    // tells you what that mapping is.
-    Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, 
Operator<?>>();
-
-    // there's some special handling for dummyOps required. Mapjoins won't be 
properly
-    // initialized if their dummy parents aren't initialized. Since we cloned 
the plan
-    // we need to replace the dummy operators in the work with the cloned ones.
-    List<HashTableDummyOperator> dummyOps = new 
LinkedList<HashTableDummyOperator>();
-
-    Iterator<Operator<?>> it = newRoots.iterator();
-    for (Operator<?> orig: roots) {
-      Set<FileSinkOperator> fsOpSet = OperatorUtils.findOperators(orig, 
FileSinkOperator.class);
-      for (FileSinkOperator fsOp : fsOpSet) {
-        context.fileSinkSet.remove(fsOp);
-      }
-
-      Operator<?> newRoot = it.next();
-      if (newRoot instanceof HashTableDummyOperator) {
-        dummyOps.add((HashTableDummyOperator) newRoot);
-        it.remove();
-      } else {
-        replacementMap.put(orig, newRoot);
-      }
-    }
-
-    // now we remove all the unions. we throw away any branch that's not 
reachable from
-    // the current set of roots. The reason is that those branches will be 
handled in
-    // different tasks.
-    Deque<Operator<?>> operators = new LinkedList<Operator<?>>();
-    operators.addAll(newRoots);
-
-    Set<Operator<?>> seen = new HashSet<Operator<?>>();
-
-    while (!operators.isEmpty()) {
-      Operator<?> current = operators.pop();
-      seen.add(current);
-
-      if (current instanceof FileSinkOperator) {
-        FileSinkOperator fileSink = (FileSinkOperator)current;
-
-        // remember it for additional processing later
-        context.fileSinkSet.add(fileSink);
-
-        FileSinkDesc desc = fileSink.getConf();
-        Path path = desc.getDirName();
-        List<FileSinkDesc> linked;
-
-        if (!context.linkedFileSinks.containsKey(path)) {
-          linked = new ArrayList<FileSinkDesc>();
-          context.linkedFileSinks.put(path, linked);
-        }
-        linked = context.linkedFileSinks.get(path);
-        linked.add(desc);
-
-        desc.setLinkedFileSinkDesc(linked);
-      }
-
-      if (current instanceof UnionOperator) {
-        Operator<?> parent = null;
-        int count = 0;
-
-        for (Operator<?> op: current.getParentOperators()) {
-          if (seen.contains(op)) {
-            ++count;
-            parent = op;
-          }
-        }
-
-        // we should have been able to reach the union from only one side.
-        Preconditions.checkArgument(count <= 1,
-            "AssertionError: expected count to be <= 1, but was " + count);
-
-        if (parent == null) {
-          // root operator is union (can happen in reducers)
-          replacementMap.put(current, current.getChildOperators().get(0));
-        } else {
-          parent.removeChildAndAdoptItsChildren(current);
-        }
-      }
-
-      if (current instanceof FileSinkOperator
-          || current instanceof ReduceSinkOperator) {
-        current.setChildOperators(null);
-      } else {
-        operators.addAll(current.getChildOperators());
-      }
-    }
-    work.setDummyOps(dummyOps);
-    work.replaceRoots(replacementMap);
-  }
-
-  public void processFileSink(GenSparkProcContext context, FileSinkOperator 
fileSink)
-      throws SemanticException {
-
-    ParseContext parseContext = context.parseContext;
-
-    boolean isInsertTable = // is INSERT OVERWRITE TABLE
-        GenMapRedUtils.isInsertInto(parseContext, fileSink);
-    HiveConf hconf = parseContext.getConf();
-
-    boolean  chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
-         hconf, fileSink, context.currentTask, isInsertTable);
-    // Set stats config for FileSinkOperators which are cloned from the 
fileSink
-    List<FileSinkOperator> fileSinkList = context.fileSinkMap.get(fileSink);
-    if (fileSinkList != null) {
-      for (FileSinkOperator fsOp : fileSinkList) {
-        fsOp.getConf().setGatherStats(fileSink.getConf().isGatherStats());
-        fsOp.getConf().setStatsReliable(fileSink.getConf().isStatsReliable());
-      }
-    }
-
-    Path finalName = createMoveTask(context.currentTask,
-        chDir, fileSink, parseContext, context.moveTask, hconf, 
context.dependencyTask);
-
-    if (chDir) {
-      // Merge the files in the destination table/partitions by creating 
Map-only merge job
-      // If underlying data is RCFile a RCFileBlockMerge task would be created.
-      LOG.info("using CombineHiveInputformat for the merge job");
-      GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
-          context.dependencyTask, context.moveTask,
-          hconf, context.currentTask);
-    }
-
-    FetchTask fetchTask = parseContext.getFetchTask();
-    if (fetchTask != null && context.currentTask.getNumChild() == 0) {
-      if (fetchTask.isFetchFrom(fileSink.getConf())) {
-        context.currentTask.setFetchSource(true);
-      }
-    }
-  }
-
-  /**
-   * Create and add any dependent move tasks.
-   *
-   * This is forked from {@link GenMapRedUtils}. The difference is that it 
doesn't check
-   * 'isLinkedFileSink' and does not set parent dir for the linked file sinks.
-   */
-  public static Path createMoveTask(Task<? extends Serializable> currTask, 
boolean chDir,
-      FileSinkOperator fsOp, ParseContext parseCtx, List<Task<MoveWork>> 
mvTasks,
-      HiveConf hconf, DependencyCollectionTask dependencyTask) {
-
-    Path dest = null;
-
-    if (chDir) {
-      dest = fsOp.getConf().getFinalDirName();
-
-      // generate the temporary file
-      // it must be on the same file system as the current destination
-      Context baseCtx = parseCtx.getContext();
-
-      Path tmpDir = baseCtx.getExternalTmpPath(dest);
-
-      FileSinkDesc fileSinkDesc = fsOp.getConf();
-      // Change all the linked file sink descriptors
-      if (fileSinkDesc.getLinkedFileSinkDesc() != null) {
-        for (FileSinkDesc fsConf : fileSinkDesc.getLinkedFileSinkDesc()) {
-          fsConf.setDirName(tmpDir);
-        }
-      } else {
-        fileSinkDesc.setDirName(tmpDir);
-      }
-    }
-
-    Task<MoveWork> mvTask = null;
-
-    if (!chDir) {
-      mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp);
-    }
-
-    // Set the move task to be dependent on the current task
-    if (mvTask != null) {
-      GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, 
dependencyTask);
-    }
-
-    return dest;
-  }
-
-  /**
-   * Populate partition pruning information from the pruning sink operator to 
the
-   * target MapWork (the MapWork for the big table side). The information 
include the source table
-   * name, column name, and partition key expression. It also set up the 
temporary path used to
-   * communicate between the target MapWork and source BaseWork.
-   *
-   * Here "source" refers to the small table side, while "target" refers to 
the big
-   * table side.
-   *
-   * @param context the spark context.
-   * @param pruningSink the pruner sink operator being processed.
-   */
-  public void processPartitionPruningSink(GenSparkProcContext context,
-      SparkPartitionPruningSinkOperator pruningSink) {
-    SparkPartitionPruningSinkDesc desc = pruningSink.getConf();
-    TableScanOperator ts = desc.getTableScan();
-    MapWork targetWork = (MapWork) context.rootToWorkMap.get(ts);
-
-    Preconditions.checkArgument(
-        targetWork != null,
-        "No targetWork found for tablescan " + ts);
-
-    String targetId = SparkUtilities.getWorkId(targetWork);
-
-    BaseWork sourceWork = getEnclosingWork(pruningSink, context);
-    String sourceId = SparkUtilities.getWorkId(sourceWork);
-
-    // set up temporary path to communicate between the small/big table
-    Path tmpPath = targetWork.getTmpPathForPartitionPruning();
-    if (tmpPath == null) {
-      Path baseTmpPath = context.parseContext.getContext().getMRTmpPath();
-      tmpPath = SparkUtilities.generateTmpPathForPartitionPruning(baseTmpPath, 
targetId);
-      targetWork.setTmpPathForPartitionPruning(tmpPath);
-      LOG.info("Setting tmp path between source work and target work:\n" + 
tmpPath);
-    }
-
-    desc.setPath(new Path(tmpPath, sourceId));
-    desc.setTargetWork(targetWork.getName());
-
-    // store table descriptor in map-targetWork
-    if (!targetWork.getEventSourceTableDescMap().containsKey(sourceId)) {
-      targetWork.getEventSourceTableDescMap().put(sourceId, new 
LinkedList<TableDesc>());
-    }
-    List<TableDesc> tables = 
targetWork.getEventSourceTableDescMap().get(sourceId);
-    tables.add(pruningSink.getConf().getTable());
-
-    // store column name in map-targetWork
-    if (!targetWork.getEventSourceColumnNameMap().containsKey(sourceId)) {
-      targetWork.getEventSourceColumnNameMap().put(sourceId, new 
LinkedList<String>());
-    }
-    List<String> columns = 
targetWork.getEventSourceColumnNameMap().get(sourceId);
-    columns.add(desc.getTargetColumnName());
-
-    // store partition key expr in map-targetWork
-    if (!targetWork.getEventSourcePartKeyExprMap().containsKey(sourceId)) {
-      targetWork.getEventSourcePartKeyExprMap().put(sourceId, new 
LinkedList<ExprNodeDesc>());
-    }
-    List<ExprNodeDesc> keys = 
targetWork.getEventSourcePartKeyExprMap().get(sourceId);
-    keys.add(desc.getPartKey());
-  }
-
-  public static SparkEdgeProperty getEdgeProperty(HiveConf conf, 
ReduceSinkOperator reduceSink,
-      ReduceWork reduceWork) throws SemanticException {
-    boolean useSparkGroupBy = 
conf.getBoolVar(HiveConf.ConfVars.SPARK_USE_GROUPBY_SHUFFLE);
-    SparkEdgeProperty edgeProperty = new 
SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE);
-    edgeProperty.setNumPartitions(reduceWork.getNumReduceTasks());
-    String sortOrder = 
Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim();
-
-    if (hasGBYOperator(reduceSink)) {
-      edgeProperty.setShuffleGroup();
-      // test if the group by needs partition level sort, if so, use the MR 
style shuffle
-      // SHUFFLE_SORT shouldn't be used for this purpose, see HIVE-8542
-      if (!useSparkGroupBy || (!sortOrder.isEmpty() && 
groupByNeedParLevelOrder(reduceSink))) {
-        if (!useSparkGroupBy) {
-          LOG.info("hive.spark.use.groupby.shuffle is off. Use repartition 
shuffle instead.");
-        }
-        edgeProperty.setMRShuffle();
-      }
-    }
-
-    if (reduceWork.getReducer() instanceof JoinOperator) {
-      //reduce-side join, use MR-style shuffle
-      edgeProperty.setMRShuffle();
-    }
-
-    //If its a FileSink to bucketed files, also use MR-style shuffle to
-    // get compatible taskId for bucket-name
-    FileSinkOperator fso = getChildOperator(reduceWork.getReducer(), 
FileSinkOperator.class);
-    if (fso != null) {
-      String bucketCount = 
fso.getConf().getTableInfo().getProperties().getProperty(
-          hive_metastoreConstants.BUCKET_COUNT);
-      if (bucketCount != null && Integer.parseInt(bucketCount) > 1) {
-        edgeProperty.setMRShuffle();
-      }
-    }
-
-    // test if we need partition/global order, SHUFFLE_SORT should only be 
used for global order
-    if (edgeProperty.isShuffleNone() && !sortOrder.isEmpty()) {
-      if ((reduceSink.getConf().getPartitionCols() == null
-          || reduceSink.getConf().getPartitionCols().isEmpty()
-          || isSame(reduceSink.getConf().getPartitionCols(), 
reduceSink.getConf().getKeyCols()))
-          && reduceSink.getConf().hasOrderBy()) {
-        edgeProperty.setShuffleSort();
-      } else {
-        edgeProperty.setMRShuffle();
-      }
-    }
-
-    // simple distribute-by goes here
-    if (edgeProperty.isShuffleNone()) {
-      if (!useSparkGroupBy) {
-        LOG.info("hive.spark.use.groupby.shuffle is off. Use repartition 
shuffle instead.");
-        edgeProperty.setMRShuffle();
-      } else {
-        edgeProperty.setShuffleGroup();
-      }
-    }
-
-
-    return edgeProperty;
-  }
-
-  /**
-   * Test if we need partition level order for group by query.
-   * GBY needs partition level order when distinct is present. Therefore, if 
the sorting
-   * keys, partitioning keys and grouping keys are the same, we ignore the 
sort and use
-   * GroupByShuffler to shuffle the data. In this case a group-by 
transformation should be
-   * sufficient to produce the correct results, i.e. data is properly grouped 
by the keys
-   * but keys are not guaranteed to be sorted.
-   */
-  private static boolean groupByNeedParLevelOrder(ReduceSinkOperator 
reduceSinkOperator) {
-    // whether we have to enforce sort anyway, e.g. in case of RS deduplication
-    if (reduceSinkOperator.getConf().isDeduplicated()) {
-      return true;
-    }
-    List<Operator<? extends OperatorDesc>> children = 
reduceSinkOperator.getChildOperators();
-    if (children != null && children.size() == 1
-      && children.get(0) instanceof GroupByOperator) {
-      GroupByOperator child = (GroupByOperator) children.get(0);
-      if (isSame(reduceSinkOperator.getConf().getKeyCols(),
-          reduceSinkOperator.getConf().getPartitionCols())
-          && reduceSinkOperator.getConf().getKeyCols().size() == 
child.getConf().getKeys().size()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Test if two lists of ExprNodeDesc are semantically same.
-   */
-  private static boolean isSame(List<ExprNodeDesc> list1, List<ExprNodeDesc> 
list2) {
-    if (list1 != list2) {
-      if (list1 != null && list2 != null) {
-        if (list1.size() != list2.size()) {
-          return false;
-        }
-        for (int i = 0; i < list1.size(); i++) {
-          if (!list1.get(i).isSame(list2.get(i))) {
-            return false;
-          }
-        }
-      } else {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @SuppressWarnings("unchecked")
-  public static <T> T getChildOperator(Operator<?> op, Class<T> klazz) throws 
SemanticException {
-    if (klazz.isInstance(op)) {
-      return (T) op;
-    }
-    List<Operator<?>> childOperators = op.getChildOperators();
-    for (Operator<?> childOp : childOperators) {
-      T result = getChildOperator(childOp, klazz);
-      if (result != null) {
-        return result;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Fill MapWork with 'local' work and bucket information for SMB Join.
-   * @param context context, containing references to MapWorks and their SMB 
information.
-   * @throws SemanticException
-   */
-  public void annotateMapWork(GenSparkProcContext context) throws 
SemanticException {
-    for (SMBMapJoinOperator smbMapJoinOp : context.smbMapJoinCtxMap.keySet()) {
-      //initialize mapwork with smbMapJoin information.
-      SparkSMBMapJoinInfo smbMapJoinInfo = 
context.smbMapJoinCtxMap.get(smbMapJoinOp);
-      MapWork work = smbMapJoinInfo.mapWork;
-      SparkSortMergeJoinFactory.annotateMapWork(context, work, smbMapJoinOp,
-        (TableScanOperator) smbMapJoinInfo.bigTableRootOp, false);
-      for (Operator<?> smallTableRootOp : smbMapJoinInfo.smallTableRootOps) {
-        SparkSortMergeJoinFactory.annotateMapWork(context, work, smbMapJoinOp,
-          (TableScanOperator) smallTableRootOp, true);
-      }
-    }
-  }
-
-  public synchronized int getNextSeqNumber() {
-    return ++sequenceNumber;
-  }
-
-  // test if we need group-by shuffle
-  private static boolean hasGBYOperator(ReduceSinkOperator rs) {
-    if (rs.getChildOperators().size() == 1) {
-      if (rs.getChildOperators().get(0) instanceof GroupByOperator) {
-        return true;
-      } else if (rs.getChildOperators().get(0) instanceof ForwardOperator) {
-        for (Operator grandChild : 
rs.getChildOperators().get(0).getChildOperators()) {
-          if (!(grandChild instanceof GroupByOperator)) {
-            return false;
-          }
-        }
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * getEncosingWork finds the BaseWork any given operator belongs to.
-   */
-  public BaseWork getEnclosingWork(Operator<?> op, GenSparkProcContext 
procCtx) {
-    List<Operator<?>> ops = new ArrayList<Operator<?>>();
-    OperatorUtils.findRoots(op, ops);
-    for (Operator<?> r : ops) {
-      BaseWork work = procCtx.rootToWorkMap.get(r);
-      if (work != null) {
-        return work;
-      }
-    }
-    return null;
-  }
-}

Reply via email to