DRILL-5304: Queries fail intermittently when there is skew in data distribution
close #766 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69de3a1e Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69de3a1e Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69de3a1e Branch: refs/heads/master Commit: 69de3a1e409bb1fb9a25e679ce1750d9f9daf238 Parents: 974c613 Author: Padma Penumarthy <ppenuma...@yahoo.com> Authored: Mon Feb 27 18:32:24 2017 -0800 Committer: Jinfeng Ni <j...@apache.org> Committed: Wed Mar 1 23:15:34 2017 -0800 ---------------------------------------------------------------------- .../SoftAffinityFragmentParallelizer.java | 2 +- .../exec/store/schedule/AssignmentCreator.java | 28 +++++++++++++------- 2 files changed, 19 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/69de3a1e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java index 1ebed86..644263e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java @@ -117,7 +117,7 @@ public class SoftAffinityFragmentParallelizer implements FragmentParallelizer { // Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details) int affinedSlots = - Math.max(1, (int) (parameters.getAffinityFactor() * width / activeEndpoints.size())) * sortedAffinityList.size(); + Math.max(1, (int) (Math.ceil((double)parameters.getAffinityFactor() * width / activeEndpoints.size()) * sortedAffinityList.size())); // Make sure affined slots is at least the number of mandatory nodes affinedSlots = Math.max(affinedSlots, numRequiredNodes); http://git-wip-us.apache.org/repos/asf/drill/blob/69de3a1e/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java index aeaf4bf..198d1ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java @@ -106,13 +106,16 @@ public class AssignmentCreator<T extends CompleteWork> { LinkedList<WorkEndpointListPair<T>> unassignedWorkList; Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators = getEndpointIterators(); - // Assign upto minCount per node based on locality. - unassignedWorkList = assign(workList, endpointIterators, true); // Assign upto maxCount per node based on locality. - unassignedWorkList = assign(unassignedWorkList, endpointIterators, false); + unassignedWorkList = assign(workList, endpointIterators, false); + // Assign upto minCount per node in a round robin fashion. assignLeftovers(unassignedWorkList, endpointIterators, true); - // Assign upto maxCount per node in a round robin fashion. + + // Assign upto maxCount + leftovers per node based on locality. + unassignedWorkList = assign(unassignedWorkList, endpointIterators, true); + + // Assign upto maxCount + leftovers per node in a round robin fashion. assignLeftovers(unassignedWorkList, endpointIterators, false); if (unassignedWorkList.size() != 0) { @@ -127,10 +130,12 @@ public class AssignmentCreator<T extends CompleteWork> { * * @param workList the list of work units to assign * @param endpointIterators the endpointIterators to assign to - * @param assignMinimum whether to assign only up to the minimum required + * @param assignMaxLeftOvers whether to assign upto maximum including leftovers * @return a list of unassigned work units */ - private LinkedList<WorkEndpointListPair<T>> assign(List<WorkEndpointListPair<T>> workList, Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, boolean assignMinimum) { + private LinkedList<WorkEndpointListPair<T>> assign(List<WorkEndpointListPair<T>> workList, + Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, + boolean assignMaxLeftOvers) { LinkedList<WorkEndpointListPair<T>> currentUnassignedList = Lists.newLinkedList(); outer: for (WorkEndpointListPair<T> workPair : workList) { List<DrillbitEndpoint> endpoints = workPair.sortedEndpoints; @@ -139,7 +144,7 @@ public class AssignmentCreator<T extends CompleteWork> { if (iteratorWrapper == null) { continue; } - if (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : iteratorWrapper.maxCount)) { + if (iteratorWrapper.count < (assignMaxLeftOvers ? (iteratorWrapper.maxCount + iteratorWrapper.maxCountLeftOver) : iteratorWrapper.maxCount)) { Integer assignment = iteratorWrapper.iter.next(); iteratorWrapper.count++; mappings.put(assignment, workPair.work); @@ -157,9 +162,11 @@ public class AssignmentCreator<T extends CompleteWork> { * @param endpointIterators the endpointIterators to assign to * @param assignMinimum wheterh to assign the minimum amount */ - private void assignLeftovers(LinkedList<WorkEndpointListPair<T>> unassignedWorkList, Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, boolean assignMinimum) { + private void assignLeftovers(LinkedList<WorkEndpointListPair<T>> unassignedWorkList, + Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, + boolean assignMinimum) { outer: for (FragIteratorWrapper iteratorWrapper : endpointIterators.values()) { - while (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : iteratorWrapper.maxCount)) { + while (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : (iteratorWrapper.maxCount + iteratorWrapper.maxCountLeftOver))) { WorkEndpointListPair<T> workPair = unassignedWorkList.poll(); if (workPair == null) { break outer; @@ -261,7 +268,7 @@ public class AssignmentCreator<T extends CompleteWork> { while (totalMaxCount < units.size()) { for (Entry<DrillbitEndpoint, FragIteratorWrapper> entry : map.entrySet()) { FragIteratorWrapper iteratorWrapper = entry.getValue(); - iteratorWrapper.maxCount++; + iteratorWrapper.maxCountLeftOver++; totalMaxCount++; if (totalMaxCount == units.size()) { break; @@ -279,6 +286,7 @@ public class AssignmentCreator<T extends CompleteWork> { private static class FragIteratorWrapper { int count = 0; int maxCount; + int maxCountLeftOver; int minCount; Iterator<Integer> iter; }