DRILL-5304: Queries fail intermittently when there is skew in data distribution

close #766


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69de3a1e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69de3a1e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69de3a1e

Branch: refs/heads/master
Commit: 69de3a1e409bb1fb9a25e679ce1750d9f9daf238
Parents: 974c613
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Authored: Mon Feb 27 18:32:24 2017 -0800
Committer: Jinfeng Ni <j...@apache.org>
Committed: Wed Mar 1 23:15:34 2017 -0800

----------------------------------------------------------------------
 .../SoftAffinityFragmentParallelizer.java       |  2 +-
 .../exec/store/schedule/AssignmentCreator.java  | 28 +++++++++++++-------
 2 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/69de3a1e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
index 1ebed86..644263e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
@@ -117,7 +117,7 @@ public class SoftAffinityFragmentParallelizer implements 
FragmentParallelizer {
 
       // Find the maximum number of slots which should go to endpoints with 
affinity (See DRILL-825 for details)
       int affinedSlots =
-          Math.max(1, (int) (parameters.getAffinityFactor() * width / 
activeEndpoints.size())) * sortedAffinityList.size();
+          Math.max(1, (int) (Math.ceil((double)parameters.getAffinityFactor() 
* width / activeEndpoints.size()) * sortedAffinityList.size()));
 
       // Make sure affined slots is at least the number of mandatory nodes
       affinedSlots = Math.max(affinedSlots, numRequiredNodes);

http://git-wip-us.apache.org/repos/asf/drill/blob/69de3a1e/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index aeaf4bf..198d1ac 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -106,13 +106,16 @@ public class AssignmentCreator<T extends CompleteWork> {
     LinkedList<WorkEndpointListPair<T>> unassignedWorkList;
     Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators = 
getEndpointIterators();
 
-    // Assign upto minCount per node based on locality.
-    unassignedWorkList = assign(workList, endpointIterators, true);
     // Assign upto maxCount per node based on locality.
-    unassignedWorkList = assign(unassignedWorkList, endpointIterators, false);
+    unassignedWorkList = assign(workList, endpointIterators, false);
+
     // Assign upto minCount per node in a round robin fashion.
     assignLeftovers(unassignedWorkList, endpointIterators, true);
-    // Assign upto maxCount per node in a round robin fashion.
+
+    // Assign upto maxCount + leftovers per node based on locality.
+    unassignedWorkList = assign(unassignedWorkList, endpointIterators,  true);
+
+    // Assign upto maxCount + leftovers per node in a round robin fashion.
     assignLeftovers(unassignedWorkList, endpointIterators, false);
 
     if (unassignedWorkList.size() != 0) {
@@ -127,10 +130,12 @@ public class AssignmentCreator<T extends CompleteWork> {
    *
    * @param workList the list of work units to assign
    * @param endpointIterators the endpointIterators to assign to
-   * @param assignMinimum whether to assign only up to the minimum required
+   * @param assignMaxLeftOvers whether to assign upto maximum including 
leftovers
    * @return a list of unassigned work units
    */
-  private LinkedList<WorkEndpointListPair<T>> 
assign(List<WorkEndpointListPair<T>> workList, 
Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, boolean 
assignMinimum) {
+  private LinkedList<WorkEndpointListPair<T>> 
assign(List<WorkEndpointListPair<T>> workList,
+                                                     
Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators,
+                                                     boolean 
assignMaxLeftOvers) {
     LinkedList<WorkEndpointListPair<T>> currentUnassignedList = 
Lists.newLinkedList();
     outer: for (WorkEndpointListPair<T> workPair : workList) {
       List<DrillbitEndpoint> endpoints = workPair.sortedEndpoints;
@@ -139,7 +144,7 @@ public class AssignmentCreator<T extends CompleteWork> {
         if (iteratorWrapper == null) {
           continue;
         }
-        if (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount 
: iteratorWrapper.maxCount)) {
+        if (iteratorWrapper.count < (assignMaxLeftOvers ? 
(iteratorWrapper.maxCount + iteratorWrapper.maxCountLeftOver) : 
iteratorWrapper.maxCount)) {
           Integer assignment = iteratorWrapper.iter.next();
           iteratorWrapper.count++;
           mappings.put(assignment, workPair.work);
@@ -157,9 +162,11 @@ public class AssignmentCreator<T extends CompleteWork> {
    * @param endpointIterators the endpointIterators to assign to
    * @param assignMinimum wheterh to assign the minimum amount
    */
-  private void assignLeftovers(LinkedList<WorkEndpointListPair<T>> 
unassignedWorkList, Map<DrillbitEndpoint,FragIteratorWrapper> 
endpointIterators, boolean assignMinimum) {
+  private void assignLeftovers(LinkedList<WorkEndpointListPair<T>> 
unassignedWorkList,
+                               Map<DrillbitEndpoint,FragIteratorWrapper> 
endpointIterators,
+                               boolean assignMinimum) {
     outer: for (FragIteratorWrapper iteratorWrapper : 
endpointIterators.values()) {
-      while (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount 
: iteratorWrapper.maxCount)) {
+      while (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount 
: (iteratorWrapper.maxCount + iteratorWrapper.maxCountLeftOver))) {
         WorkEndpointListPair<T> workPair = unassignedWorkList.poll();
         if (workPair == null) {
           break outer;
@@ -261,7 +268,7 @@ public class AssignmentCreator<T extends CompleteWork> {
     while (totalMaxCount < units.size()) {
       for (Entry<DrillbitEndpoint, FragIteratorWrapper> entry : 
map.entrySet()) {
         FragIteratorWrapper iteratorWrapper = entry.getValue();
-        iteratorWrapper.maxCount++;
+        iteratorWrapper.maxCountLeftOver++;
         totalMaxCount++;
         if (totalMaxCount == units.size()) {
           break;
@@ -279,6 +286,7 @@ public class AssignmentCreator<T extends CompleteWork> {
   private static class FragIteratorWrapper {
     int count = 0;
     int maxCount;
+    int maxCountLeftOver;
     int minCount;
     Iterator<Integer> iter;
   }

Reply via email to