Repository: flink
Updated Branches:
  refs/heads/master fad60bea2 -> b5a7b3578


[hotfix] Simplify computation in 
KeyGroupRangeAssignment::computeKeyGroupRangeForOperatorIndex


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5a7b357
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5a7b357
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5a7b357

Branch: refs/heads/master
Commit: b5a7b357856a7dc1315e7fae44d5032dbfa9dda3
Parents: fad60be
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Fri Feb 2 13:57:55 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Feb 2 14:00:13 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/state/KeyGroupRangeAssignment.java    | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5a7b357/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index 62bf3f6..6b6080a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -80,20 +80,20 @@ public final class KeyGroupRangeAssignment {
         * @param maxParallelism Maximal parallelism that the job was initially 
created with.
         * @param parallelism    The current parallelism under which the job 
runs. Must be <= maxParallelism.
         * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
-        * @return
+        * @return the computed key-group range for the operator.
         */
        public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
-                       int maxParallelism,
-                       int parallelism,
-                       int operatorIndex) {
+               int maxParallelism,
+               int parallelism,
+               int operatorIndex) {
 
                checkParallelismPreconditions(parallelism);
                checkParallelismPreconditions(maxParallelism);
 
                Preconditions.checkArgument(maxParallelism >= parallelism,
-                               "Maximum parallelism must not be smaller than 
parallelism.");
+                       "Maximum parallelism must not be smaller than 
parallelism.");
 
-               int start = operatorIndex == 0 ? 0 : ((operatorIndex * 
maxParallelism - 1) / parallelism) + 1;
+               int start = ((operatorIndex * maxParallelism + parallelism - 1) 
/ parallelism);
                int end = ((operatorIndex + 1) * maxParallelism - 1) / 
parallelism;
                return new KeyGroupRange(start, end);
        }

Reply via email to