Repository: storm Updated Branches: refs/heads/master a715e9a5c -> 0b3203239
STORM-2940 dynamically set the CAPACITY value of LoadAwareShuffleGrouping * lose granularity if targetTask size is larger than 1000 * add tests This closes #2551 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b332503b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b332503b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b332503b Branch: refs/heads/master Commit: b332503b113fa6cde6afa320c9c190e3aad5b4c3 Parents: a715e9a Author: Ethan Li <ethanopensou...@gmail.com> Authored: Wed Feb 7 13:40:45 2018 -0600 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Sat Feb 10 09:11:37 2018 +0900 ---------------------------------------------------------------------- .../grouping/LoadAwareShuffleGrouping.java | 35 ++++++++++++-------- .../grouping/LoadAwareShuffleGroupingTest.java | 33 +++++++++++++----- 2 files changed, 46 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b332503b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java index 3fd75e5..832ba7f 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java @@ -44,7 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable { - static final int CAPACITY = 1000; + private int capacity; private static final int MAX_WEIGHT = 100; private static class IndexAndWeights { final int index; @@ -85,6 +85,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, sourceNodeInfo = new NodeInfo(context.getThisWorkerHost(), Sets.newHashSet((long) context.getThisWorkerPort())); taskToNodePort = context.getTaskToNodePort(); this.targetTasks = targetTasks; + capacity = targetTasks.size() == 1 ? 1 : Math.max(1000, targetTasks.size() * 5); conf = context.getConf(); dnsToSwitchMapping = ReflectionUtils.newInstance((String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN)); localityGroup = new HashMap<>(); @@ -101,11 +102,11 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, } // can't leave choices to be empty, so initiate it similar as ShuffleGrouping - choices = new int[CAPACITY]; + choices = new int[capacity]; current = new AtomicInteger(0); // allocate another array to be switched - prepareChoices = new int[CAPACITY]; + prepareChoices = new int[capacity]; updateRing(null); } @@ -114,9 +115,9 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, int rightNow; while (true) { rightNow = current.incrementAndGet(); - if (rightNow < CAPACITY) { + if (rightNow < capacity) { return rets[choices[rightNow]]; - } else if (rightNow == CAPACITY) { + } else if (rightNow == capacity) { current.set(0); return rets[choices[0]]; } @@ -163,7 +164,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, if (targetInScope.isEmpty()) { Scope upScope = Scope.upgrade(currentScope); if (upScope == currentScope) { - throw new RuntimeException("This executor has no target tasks."); + throw new RuntimeException("The current scope " + currentScope + " has no target tasks."); } currentScope = upScope; return transition(load); @@ -222,21 +223,24 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, if (weightSum > 0) { for (int target: targetsInScope) { IndexAndWeights indexAndWeights = orig.get(target); - int count = (int) ((indexAndWeights.weight / (double) weightSum) * CAPACITY); - for (int i = 0; i < count && currentIdx < CAPACITY; i++) { + int count = (int) ((indexAndWeights.weight / (double) weightSum) * capacity); + for (int i = 0; i < count && currentIdx < capacity; i++) { prepareChoices[currentIdx] = indexAndWeights.index; currentIdx++; } } - //in case we didn't fill in enough - for (; currentIdx < CAPACITY; currentIdx++) { - prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)]; + if (currentIdx > 0) { + //in case we didn't fill in enough + for (; currentIdx < capacity; currentIdx++) { + prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)]; + } } - } else { + } + if (currentIdx == 0) { //This really should be impossible, because we go off of the min load, and inc anything within 5% of it. // But just to be sure it is never an issue, especially with float rounding etc. - for (;currentIdx < CAPACITY; currentIdx++) { + for (;currentIdx < capacity; currentIdx++) { prepareChoices[currentIdx] = currentIdx % rets.length; } } @@ -322,4 +326,9 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, } } } + + //only for test + public int getCapacity() { + return capacity; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/b332503b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java ---------------------------------------------------------------------- diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java index d704900..12ab32a 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java @@ -101,10 +101,10 @@ public class LoadAwareShuffleGroupingTest { double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight); double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight); assertEquals("i = " + i, - expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY, + expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCapacity(), 0.01); assertEquals("i = " + i, - expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY, + expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCapacity(), 0.01); } @@ -121,9 +121,9 @@ public class LoadAwareShuffleGroupingTest { LOG.info("contByType = {}", countByType); double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight); double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight); - assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY, + assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCapacity(), 0.01); - assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY, + assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCapacity(), 0.01); } } @@ -138,9 +138,16 @@ public class LoadAwareShuffleGroupingTest { } @Test - public void testLoadAwareShuffleGroupingWithEvenLoad() { - // just pick arbitrary number - final int numTasks = 7; + public void testLoadAwareShuffleGroupingWithEvenLoadWithManyTargets() { + testLoadAwareShuffleGroupingWithEvenLoad(1000); + } + + @Test + public void testLoadAwareShuffleGroupingWithEvenLoadWithLessTargets() { + testLoadAwareShuffleGroupingWithEvenLoad(7); + } + + private void testLoadAwareShuffleGroupingWithEvenLoad(int numTasks) { final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping(); // Define our taskIds and loads @@ -166,8 +173,16 @@ public class LoadAwareShuffleGroupingTest { } @Test - public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded() throws InterruptedException, ExecutionException { - final int numTasks = 7; + public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreadedWithManyTargets() throws ExecutionException, InterruptedException { + testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(1000); + } + + @Test + public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreadedWithLessTargets() throws ExecutionException, InterruptedException { + testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(7); + } + + private void testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(int numTasks) throws InterruptedException, ExecutionException { final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();