This is an automated email from the ASF dual-hosted git repository. mahesh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 875ba82 HIVE-21700 : Hive incremental load going OOM while adding load task to the leaf nodes of the DAG. (Mahesh Kumar Behera, reviewed by Sankar Hariappan) 875ba82 is described below commit 875ba8231b824a500bd0844613285aadd64e651e Author: Mahesh Kumar Behera <mah...@apache.org> AuthorDate: Wed May 8 11:26:29 2019 +0530 HIVE-21700 : Hive incremental load going OOM while adding load task to the leaf nodes of the DAG. (Mahesh Kumar Behera, reviewed by Sankar Hariappan) --- .../hadoop/hive/ql/exec/util/DAGTraversal.java | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/DAGTraversal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/DAGTraversal.java index cb5dc2e..40f5f55 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/DAGTraversal.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/DAGTraversal.java @@ -23,7 +23,9 @@ import org.apache.hadoop.hive.ql.exec.Task; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * The dag traversal done here is written to be not recursion based as large DAG's will lead to @@ -33,7 +35,21 @@ public class DAGTraversal { public static void traverse(List<Task<? extends Serializable>> tasks, Function function) { List<Task<? extends Serializable>> listOfTasks = new ArrayList<>(tasks); while (!listOfTasks.isEmpty()) { - List<Task<? extends Serializable>> children = new ArrayList<>(); + // HashSet will make sure that no duplicate children are added. If a task is added multiple + // time to the children list then it may cause the list to grow exponentially. Lets take an example of + // incremental load with 2 events. The DAG will look some thing similar as below. + // + // --->ev1.task1-- --->ev2.task1-- + // / \ / \ + // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->ev2.barrierTask------- + // \ / + // --->ev1.task3-- + // + // While traversing the DAG, if the filter is not added then ev1.barrierTask will be added 3 times in + // the children list and in next iteration ev2.task1 will be added 3 times and ev2.task2 will be added + // 3 times. So in next iteration ev2.barrierTask will be added 6 times. As it goes like this, the next barrier + // task will be added 12-15 times and may reaches millions with large number of events. + Set<Task<? extends Serializable>> children = new HashSet<>(); for (Task<? extends Serializable> task : listOfTasks) { // skip processing has to be done first before continuing if (function.skipProcessing(task)) { @@ -48,7 +64,7 @@ public class DAGTraversal { } function.process(task); } - listOfTasks = children; + listOfTasks = new ArrayList<>(children); } }