Repository: flink Updated Branches: refs/heads/master fad60bea2 -> b5a7b3578
[hotfix] Simplify computation in KeyGroupRangeAssignment::computeKeyGroupRangeForOperatorIndex Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5a7b357 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5a7b357 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5a7b357 Branch: refs/heads/master Commit: b5a7b357856a7dc1315e7fae44d5032dbfa9dda3 Parents: fad60be Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Fri Feb 2 13:57:55 2018 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Fri Feb 2 14:00:13 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/state/KeyGroupRangeAssignment.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b5a7b357/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java index 62bf3f6..6b6080a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java @@ -80,20 +80,20 @@ public final class KeyGroupRangeAssignment { * @param maxParallelism Maximal parallelism that the job was initially created with. * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism. * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. - * @return + * @return the computed key-group range for the operator. */ public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( - int maxParallelism, - int parallelism, - int operatorIndex) { + int maxParallelism, + int parallelism, + int operatorIndex) { checkParallelismPreconditions(parallelism); checkParallelismPreconditions(maxParallelism); Preconditions.checkArgument(maxParallelism >= parallelism, - "Maximum parallelism must not be smaller than parallelism."); + "Maximum parallelism must not be smaller than parallelism."); - int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; + int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism); int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; return new KeyGroupRange(start, end); }