[FLINK-3336] Add Rescale Data Shipping for DataStream

This is the Javadoc of DataStream.rescale() that describes the
behaviour:

Sets the partitioning of the {@link DataStream} so that the output elements
are distributed evenly to a subset of instances of the next operation in a 
round-robin
fashion.

The subset of downstream operations to which the upstream operation sends
elements depends on the degree of parallelism of both the upstream and 
downstream operation.
For example, if the upstream operation has parallelism 2 and the downstream 
operation
has parallelism 4, then one upstream operation would distribute elements to two
downstream operations while the other upstream operation would distribute to 
the other
two downstream operations. If, on the other hand, the downstream operation has 
parallelism
2 while the upstream operation has parallelism 4 then two upstream operations 
will
distribute to one downstream operation while the other two upstream operations 
will
distribute to the other downstream operations.

In cases where the different parallelisms are not multiples of each other one 
or several
downstream operations will have a differing number of inputs from upstream 
operations.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f6a8b6d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f6a8b6d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f6a8b6d

Branch: refs/heads/master
Commit: 9f6a8b6d075551d1b4090e191df8f56b5ef9d7e9
Parents: 7469c17
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Thu Feb 4 18:33:01 2016 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Feb 8 13:03:31 2016 +0100

----------------------------------------------------------------------
 docs/apis/streaming/fig/rescale.svg             | 472 +++++++++++++++++++
 docs/apis/streaming/index.md                    |  98 +++-
 .../streaming/api/datastream/DataStream.java    |  59 ++-
 .../datastream/SingleOutputStreamOperator.java  |  12 +
 .../api/graph/StreamingJobGraphGenerator.java   |  15 +-
 .../runtime/partitioner/RescalePartitioner.java |  65 +++
 .../partitioner/RescalePartitionerTest.java     | 202 ++++++++
 .../flink/streaming/api/scala/DataStream.scala  |  57 ++-
 8 files changed, 922 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f6a8b6d/docs/apis/streaming/fig/rescale.svg
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fig/rescale.svg 
b/docs/apis/streaming/fig/rescale.svg
new file mode 100644
index 0000000..43eeae9
--- /dev/null
+++ b/docs/apis/streaming/fig/rescale.svg
@@ -0,0 +1,472 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<svg
+   xmlns:dc="http://purl.org/dc/elements/1.1/";
+   xmlns:cc="http://creativecommons.org/ns#";
+   xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#";
+   xmlns:svg="http://www.w3.org/2000/svg";
+   xmlns="http://www.w3.org/2000/svg";
+   xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd";
+   xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape";
+   width="372.04724"
+   height="262.20471"
+   id="svg2"
+   version="1.1"
+   inkscape:version="0.48.5 r10040"
+   sodipodi:docname="New document 1">
+  <defs
+     id="defs4">
+    <marker
+       inkscape:stockid="TriangleOutM"
+       orient="auto"
+       refY="0.0"
+       refX="0.0"
+       id="TriangleOutM"
+       style="overflow:visible">
+      <path
+         id="path5012"
+         d="M 5.77,0.0 L -2.88,5.0 L -2.88,-5.0 L 5.77,0.0 z "
+         style="fill-rule:evenodd;stroke:#000000;stroke-width:1.0pt"
+         transform="scale(0.4)" />
+    </marker>
+    <marker
+       inkscape:stockid="TriangleOutL"
+       orient="auto"
+       refY="0.0"
+       refX="0.0"
+       id="TriangleOutL"
+       style="overflow:visible">
+      <path
+         id="path5009"
+         d="M 5.77,0.0 L -2.88,5.0 L -2.88,-5.0 L 5.77,0.0 z "
+         style="fill-rule:evenodd;stroke:#000000;stroke-width:1.0pt"
+         transform="scale(0.8)" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Lend"
+       orient="auto"
+       refY="0.0"
+       refX="0.0"
+       id="Arrow2Lend"
+       style="overflow:visible;">
+      <path
+         id="path4888"
+         
style="fill-rule:evenodd;stroke-width:0.62500000;stroke-linejoin:round;"
+         d="M 8.7185878,4.0337352 L -2.2072895,0.016013256 L 
8.7185884,-4.0017078 C 6.9730900,-1.6296469 6.9831476,1.6157441 
8.7185878,4.0337352 z "
+         transform="scale(1.1) rotate(180) translate(1,0)" />
+    </marker>
+  </defs>
+  <sodipodi:namedview
+     id="base"
+     pagecolor="#ffffff"
+     bordercolor="#666666"
+     borderopacity="1.0"
+     inkscape:pageopacity="0.0"
+     inkscape:pageshadow="2"
+     inkscape:zoom="2.8284271"
+     inkscape:cx="23.967455"
+     inkscape:cy="142.96627"
+     inkscape:document-units="px"
+     inkscape:current-layer="layer1"
+     showgrid="false"
+     inkscape:window-width="2560"
+     inkscape:window-height="1391"
+     inkscape:window-x="0"
+     inkscape:window-y="1"
+     inkscape:window-maximized="1" />
+  <metadata
+     id="metadata7">
+    <rdf:RDF>
+      <cc:Work
+         rdf:about="">
+        <dc:format>image/svg+xml</dc:format>
+        <dc:type
+           rdf:resource="http://purl.org/dc/dcmitype/StillImage"; />
+        <dc:title></dc:title>
+      </cc:Work>
+    </rdf:RDF>
+  </metadata>
+  <g
+     inkscape:label="Layer 1"
+     inkscape:groupmode="layer"
+     id="layer1"
+     transform="translate(0,-790.15744)">
+    <g
+       id="g6997"
+       transform="translate(0,0.17677669)">
+      <g
+         transform="translate(21.79899,7.2928933)"
+         id="g4835">
+        <g
+           id="g4721"
+           transform="translate(-2.6011424,-1.5258789e-5)">
+          <path
+             transform="matrix(1.1448338,0,0,1.1448338,-9.9783931,783.57046)"
+             d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 
-14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 
0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 
14.520942,6.501247 14.520942,14.520942 z"
+             sodipodi:ry="14.520943"
+             sodipodi:rx="14.520943"
+             sodipodi:cy="60.300472"
+             sodipodi:cx="83.716393"
+             id="path3885"
+             
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
+             sodipodi:type="arc" />
+          <text
+             sodipodi:linespacing="125%"
+             id="text4681"
+             y="856.37225"
+             x="75.488594"
+             
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
+             xml:space="preserve"><tspan
+               style="font-size:13px"
+               y="856.37225"
+               x="75.488594"
+               id="tspan4683"
+               sodipodi:role="line">Src</tspan></text>
+        </g>
+        <g
+           id="g4731"
+           transform="translate(8.7630735,76.367533)">
+          <g
+             id="g4718">
+            <path
+               transform="matrix(1.1448338,0,0,1.1448338,-21.342609,840.64408)"
+               d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 
-14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 
0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 
14.520942,6.501247 14.520942,14.520942 z"
+               sodipodi:ry="14.520943"
+               sodipodi:rx="14.520943"
+               sodipodi:cy="60.300472"
+               sodipodi:cx="83.716393"
+               id="path4710"
+               
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
+               sodipodi:type="arc" />
+          </g>
+          <text
+             xml:space="preserve"
+             
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
+             x="62.881989"
+             y="914.22382"
+             id="text4687"
+             sodipodi:linespacing="125%"><tspan
+               sodipodi:role="line"
+               id="tspan4689"
+               x="62.881989"
+               y="914.22382"
+               style="font-size:13px">Snk</tspan></text>
+        </g>
+        <g
+           id="g4753">
+          <g
+             transform="translate(-67.755979,8.3842533)"
+             id="g4726">
+            <path
+               sodipodi:type="arc"
+               
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
+               id="path4712"
+               sodipodi:cx="83.716393"
+               sodipodi:cy="60.300472"
+               sodipodi:rx="14.520943"
+               sodipodi:ry="14.520943"
+               d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 
-14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 
0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 
14.520942,6.501247 14.520942,14.520942 z"
+               transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)" 
/>
+            <text
+               xml:space="preserve"
+               
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
+               x="137.40105"
+               y="915.48651"
+               id="text4714"
+               sodipodi:linespacing="125%"><tspan
+                 sodipodi:role="line"
+                 id="tspan4716"
+                 x="137.40105"
+                 y="915.48651"
+                 style="font-size:13px">Map</tspan></text>
+          </g>
+          <g
+             id="g4737"
+             transform="translate(-119.198,8.3842533)">
+            <path
+               transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)"
+               d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 
-14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 
0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 
14.520942,6.501247 14.520942,14.520942 z"
+               sodipodi:ry="14.520943"
+               sodipodi:rx="14.520943"
+               sodipodi:cy="60.300472"
+               sodipodi:cx="83.716393"
+               id="path4739"
+               
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
+               sodipodi:type="arc" />
+            <text
+               sodipodi:linespacing="125%"
+               id="text4741"
+               y="915.48651"
+               x="137.40105"
+               
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
+               xml:space="preserve"><tspan
+                 style="font-size:13px"
+                 y="915.48651"
+                 x="137.40105"
+                 id="tspan4743"
+                 sodipodi:role="line">Map</tspan></text>
+          </g>
+          <g
+             transform="translate(-16.313963,8.3842533)"
+             id="g4745">
+            <path
+               sodipodi:type="arc"
+               
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
+               id="path4747"
+               sodipodi:cx="83.716393"
+               sodipodi:cy="60.300472"
+               sodipodi:rx="14.520943"
+               sodipodi:ry="14.520943"
+               d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 
-14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 
0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 
14.520942,6.501247 14.520942,14.520942 z"
+               transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)" 
/>
+            <text
+               xml:space="preserve"
+               
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
+               x="137.40105"
+               y="915.48651"
+               id="text4749"
+               sodipodi:linespacing="125%"><tspan
+                 sodipodi:role="line"
+                 id="tspan4751"
+                 x="137.40105"
+                 y="915.48651"
+                 style="font-size:13px">Map</tspan></text>
+          </g>
+        </g>
+      </g>
+      <path
+         inkscape:connector-curvature="0"
+         id="path4861"
+         d="M 97.248545,875.21054 54.181606,908.06852"
+         
style="fill:none;stroke:#000000;stroke-width:0.95726824px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
 />
+      <path
+         transform="translate(0,790.15744)"
+         inkscape:connector-curvature="0"
+         id="path5871"
+         d="m 104.6518,87.195784 -0.35355,31.112696"
+         
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-mid:none;marker-end:url(#TriangleOutM)"
 />
+      <path
+         transform="translate(0,790.15744)"
+         inkscape:connector-curvature="0"
+         id="path6245"
+         d="m 113.49064,84.72091 41.36575,33.58757"
+         
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
 />
+      <path
+         transform="translate(0,790.15744)"
+         inkscape:connector-curvature="0"
+         id="path6433"
+         d="m 53.033009,154.37093 43.84062,32.17336"
+         
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
 />
+      <path
+         transform="translate(0,790.15744)"
+         inkscape:connector-curvature="0"
+         id="path6621"
+         d="m 104.29825,153.66382 0.35355,31.46625"
+         
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
 />
+      <path
+         transform="translate(0,790.15744)"
+         inkscape:connector-curvature="0"
+         id="path6809"
+         d="m 155.56349,154.01737 -41.7193,32.52692"
+         
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
 />
+    </g>
+    <g
+       id="g7028"
+       transform="translate(157.92745,0.17677669)">
+      <g
+         id="g7030"
+         transform="translate(21.79899,7.2928933)">
+        <g
+           transform="translate(-2.6011424,-1.5258789e-5)"
+           id="g7032">
+          <path
+             sodipodi:type="arc"
+             
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
+             id="path7034"
+             sodipodi:cx="83.716393"
+             sodipodi:cy="60.300472"
+             sodipodi:rx="14.520943"
+             sodipodi:ry="14.520943"
+             d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 
-14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 
0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 
14.520942,6.501247 14.520942,14.520942 z"
+             transform="matrix(1.1448338,0,0,1.1448338,-9.9783931,783.57046)" 
/>
+          <text
+             xml:space="preserve"
+             
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
+             x="75.488594"
+             y="856.37225"
+             id="text7036"
+             sodipodi:linespacing="125%"><tspan
+               sodipodi:role="line"
+               id="tspan7038"
+               x="75.488594"
+               y="856.37225"
+               style="font-size:13px">Src</tspan></text>
+        </g>
+        <g
+           transform="translate(8.7630735,76.367533)"
+           id="g7040">
+          <g
+             id="g7042">
+            <path
+               sodipodi:type="arc"
+               
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
+               id="path7044"
+               sodipodi:cx="83.716393"
+               sodipodi:cy="60.300472"
+               sodipodi:rx="14.520943"
+               sodipodi:ry="14.520943"
+               d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 
-14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 
0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 
14.520942,6.501247 14.520942,14.520942 z"
+               
transform="matrix(1.1448338,0,0,1.1448338,-21.342609,840.64408)" />
+          </g>
+          <text
+             sodipodi:linespacing="125%"
+             id="text7046"
+             y="914.22382"
+             x="62.881989"
+             
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
+             xml:space="preserve"><tspan
+               style="font-size:13px"
+               y="914.22382"
+               x="62.881989"
+               id="tspan7048"
+               sodipodi:role="line">Snk</tspan></text>
+        </g>
+        <g
+           id="g7050">
+          <g
+             id="g7052"
+             transform="translate(-67.755979,8.3842533)">
+            <path
+               transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)"
+               d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 
-14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 
0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 
14.520942,6.501247 14.520942,14.520942 z"
+               sodipodi:ry="14.520943"
+               sodipodi:rx="14.520943"
+               sodipodi:cy="60.300472"
+               sodipodi:cx="83.716393"
+               id="path7054"
+               
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
+               sodipodi:type="arc" />
+            <text
+               sodipodi:linespacing="125%"
+               id="text7056"
+               y="915.48651"
+               x="137.40105"
+               
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
+               xml:space="preserve"><tspan
+                 style="font-size:13px"
+                 y="915.48651"
+                 x="137.40105"
+                 id="tspan7058"
+                 sodipodi:role="line">Map</tspan></text>
+          </g>
+          <g
+             transform="translate(-119.198,8.3842533)"
+             id="g7060">
+            <path
+               sodipodi:type="arc"
+               
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
+               id="path7062"
+               sodipodi:cx="83.716393"
+               sodipodi:cy="60.300472"
+               sodipodi:rx="14.520943"
+               sodipodi:ry="14.520943"
+               d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 
-14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 
0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 
14.520942,6.501247 14.520942,14.520942 z"
+               transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)" 
/>
+            <text
+               xml:space="preserve"
+               
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
+               x="137.40105"
+               y="915.48651"
+               id="text7064"
+               sodipodi:linespacing="125%"><tspan
+                 sodipodi:role="line"
+                 id="tspan7066"
+                 x="137.40105"
+                 y="915.48651"
+                 style="font-size:13px">Map</tspan></text>
+          </g>
+          <g
+             id="g7068"
+             transform="translate(-16.313963,8.3842533)">
+            <path
+               transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)"
+               d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 
-14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 
0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 
14.520942,6.501247 14.520942,14.520942 z"
+               sodipodi:ry="14.520943"
+               sodipodi:rx="14.520943"
+               sodipodi:cy="60.300472"
+               sodipodi:cx="83.716393"
+               id="path7070"
+               
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
+               sodipodi:type="arc" />
+            <text
+               sodipodi:linespacing="125%"
+               id="text7072"
+               y="915.48651"
+               x="137.40105"
+               
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
+               xml:space="preserve"><tspan
+                 style="font-size:13px"
+                 y="915.48651"
+                 x="137.40105"
+                 id="tspan7074"
+                 sodipodi:role="line">Map</tspan></text>
+          </g>
+        </g>
+      </g>
+      <path
+         
style="fill:none;stroke:#000000;stroke-width:0.95726824px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
+         d="M 97.248545,875.21054 54.181606,908.06852"
+         id="path7076"
+         inkscape:connector-curvature="0" />
+      <path
+         
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-mid:none;marker-end:url(#TriangleOutM)"
+         d="m 104.6518,87.195784 -0.35355,31.112696"
+         id="path7078"
+         inkscape:connector-curvature="0"
+         transform="translate(0,790.15744)" />
+      <path
+         
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
+         d="m 113.49064,84.72091 41.36575,33.58757"
+         id="path7080"
+         inkscape:connector-curvature="0"
+         transform="translate(0,790.15744)" />
+      <path
+         
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
+         d="m 53.033009,154.37093 43.84062,32.17336"
+         id="path7082"
+         inkscape:connector-curvature="0"
+         transform="translate(0,790.15744)" />
+      <path
+         
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
+         d="m 104.29825,153.66382 0.35355,31.46625"
+         id="path7084"
+         inkscape:connector-curvature="0"
+         transform="translate(0,790.15744)" />
+      <path
+         
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
+         d="m 155.56349,154.01737 -41.7193,32.52692"
+         id="path7086"
+         inkscape:connector-curvature="0"
+         transform="translate(0,790.15744)" />
+    </g>
+  </g>
+</svg>

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6a8b6d/docs/apis/streaming/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
index 707899a..31e6df9 100644
--- a/docs/apis/streaming/index.md
+++ b/docs/apis/streaming/index.md
@@ -1270,8 +1270,8 @@ dataStream.partitionByHash(0);
         <p>
             Uses a user-defined Partitioner to select the target task for each 
element.
             {% highlight java %}
-dataStream.partitionCustom(new Partitioner(){...}, "someKey");
-dataStream.partitionCustom(new Partitioner(){...}, 0);
+dataStream.partitionCustom(partitioner, "someKey");
+dataStream.partitionCustom(partitioner, 0);
             {% endhighlight %}
         </p>
       </td>
@@ -1282,7 +1282,7 @@ dataStream.partitionCustom(new Partitioner(){...}, 0);
        <p>
             Partitions elements randomly according to a uniform distribution.
             {% highlight java %}
-dataStream.partitionRandom();
+dataStream.shuffle();
             {% endhighlight %}
        </p>
      </td>
@@ -1299,6 +1299,51 @@ dataStream.rebalance();
         </p>
       </td>
     </tr>
+    <tr>
+      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements, round-robin, to a subset of downstream 
operations. This is
+            useful if you want to have pipelines where you, for example, fan 
out from
+            each parallel instance of a source to a subset of several mappers 
to distribute load
+            but don't want the full rebalance that rebalance() would incur. 
This would require only
+            local data transfers instead of transferring data over network, 
depending on
+            other configuration values such as the number of slots of 
TaskManagers.
+        </p>
+        <p>
+            The subset of downstream operations to which the upstream 
operation sends
+            elements depends on the degree of parallelism of both the upstream 
and downstream operation.
+            For example, if the upstream operation has parallelism 2 and the 
downstream operation
+            has parallelism 4, then one upstream operation would distribute 
elements to two
+            downstream operations while the other upstream operation would 
distribute to the other
+            two downstream operations. If, on the other hand, the downstream 
operation has parallelism
+            2 while the upstream operation has parallelism 4 then two upstream 
operations would
+            distribute to one downstream operation while the other two 
upstream operations would
+            distribute to the other downstream operations.
+        </p>
+        <p>
+            In cases where the different parallelisms are not multiples of 
each other one or several
+            downstream operations will have a differing number of inputs from 
upstream operations.
+
+        </p>
+        </p>
+            Please see this figure for a visualization of the connection 
pattern in the above
+            example:
+        </p>
+
+        <div style="text-align: center">
+            <img src="{{ site.baseurl }}/apis/streaming/fig/rescale.svg" 
alt="Checkpoint barriers in data streams" />
+            </div>
+
+
+        <p>
+                    {% highlight java %}
+dataStream.rescale();
+            {% endhighlight %}
+
+        </p>
+      </td>
+    </tr>
    <tr>
       <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
       <td>
@@ -1357,7 +1402,7 @@ dataStream.partitionCustom(partitioner, 0)
        <p>
             Partitions elements randomly according to a uniform distribution.
             {% highlight scala %}
-dataStream.partitionRandom()
+dataStream.shuffle()
             {% endhighlight %}
        </p>
      </td>
@@ -1374,6 +1419,51 @@ dataStream.rebalance()
         </p>
       </td>
     </tr>
+    <tr>
+      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements, round-robin, to a subset of downstream 
operations. This is
+            useful if you want to have pipelines where you, for example, fan 
out from
+            each parallel instance of a source to a subset of several mappers 
to distribute load
+            but don't want the full rebalance that rebalance() would incur. 
This would require only
+            local data transfers instead of transferring data over network, 
depending on
+            other configuration values such as the number of slots of 
TaskManagers.
+        </p>
+        <p>
+            The subset of downstream operations to which the upstream 
operation sends
+            elements depends on the degree of parallelism of both the upstream 
and downstream operation.
+            For example, if the upstream operation has parallelism 2 and the 
downstream operation
+            has parallelism 4, then one upstream operation would distribute 
elements to two
+            downstream operations while the other upstream operation would 
distribute to the other
+            two downstream operations. If, on the other hand, the downstream 
operation has parallelism
+            2 while the upstream operation has parallelism 4 then two upstream 
operations would
+            distribute to one downstream operation while the other two 
upstream operations would
+            distribute to the other downstream operations.
+        </p>
+        <p>
+            In cases where the different parallelisms are not multiples of 
each other one or several
+            downstream operations will have a differing number of inputs from 
upstream operations.
+
+        </p>
+        </p>
+            Please see this figure for a visualization of the connection 
pattern in the above
+            example:
+        </p>
+
+        <div style="text-align: center">
+            <img src="{{ site.baseurl }}/apis/streaming/fig/rescale.svg" 
alt="Checkpoint barriers in data streams" />
+            </div>
+
+
+        <p>
+                    {% highlight java %}
+dataStream.rescale()
+            {% endhighlight %}
+
+        </p>
+      </td>
+    </tr>
    <tr>
       <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
       <td>

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6a8b6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 6d2a44a..891562c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -77,6 +77,7 @@ import 
org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
@@ -390,12 +391,8 @@ public class DataStream<T> {
        }
 
        /**
-        * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are broadcasted to every parallel instance of the next component.
-        *
-        * <p>
-        * This setting only effects the how the outputs will be distributed 
between
-        * the parallel instances of the next processing operator.
+        * Sets the partitioning of the {@link DataStream} so that the output 
elements
+        * are broadcasted to every parallel instance of the next operation.
         *
         * @return The DataStream with broadcast partitioning set.
         */
@@ -404,12 +401,8 @@ public class DataStream<T> {
        }
 
        /**
-        * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are shuffled uniformly randomly to the next component.
-        *
-        * <p>
-        * This setting only effects the how the outputs will be distributed 
between
-        * the parallel instances of the next processing operator.
+        * Sets the partitioning of the {@link DataStream} so that the output 
elements
+        * are shuffled uniformly randomly to the next operation.
         *
         * @return The DataStream with shuffle partitioning set.
         */
@@ -419,13 +412,8 @@ public class DataStream<T> {
        }
 
        /**
-        * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are forwarded to the local subtask of the next component (whenever
-        * possible).
-        *
-        * <p>
-        * This setting only effects the how the outputs will be distributed 
between
-        * the parallel instances of the next processing operator.
+        * Sets the partitioning of the {@link DataStream} so that the output 
elements
+        * are forwarded to the local subtask of the next operation.
         *
         * @return The DataStream with forward partitioning set.
         */
@@ -435,14 +423,10 @@ public class DataStream<T> {
        }
 
        /**
-        * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are distributed evenly to instances of the next component in a 
Round-robin
+        * Sets the partitioning of the {@link DataStream} so that the output 
elements
+        * are distributed evenly to instances of the next operation in a 
round-robin
         * fashion.
         *
-        * <p>
-        * This setting only effects the how the outputs will be distributed 
between
-        * the parallel instances of the next processing operator.
-        *
         * @return The DataStream with rebalance partitioning set.
         */
        public DataStream<T> rebalance() {
@@ -450,6 +434,31 @@ public class DataStream<T> {
        }
 
        /**
+        * Sets the partitioning of the {@link DataStream} so that the output 
elements
+        * are distributed evenly to a subset of instances of the next 
operation in a round-robin
+        * fashion.
+        *
+        * <p>The subset of downstream operations to which the upstream 
operation sends
+        * elements depends on the degree of parallelism of both the upstream 
and downstream operation.
+        * For example, if the upstream operation has parallelism 2 and the 
downstream operation
+        * has parallelism 4, then one upstream operation would distribute 
elements to two
+        * downstream operations while the other upstream operation would 
distribute to the other
+        * two downstream operations. If, on the other hand, the downstream 
operation has parallelism
+        * 2 while the upstream operation has parallelism 4 then two upstream 
operations will
+        * distribute to one downstream operation while the other two upstream 
operations will
+        * distribute to the other downstream operations.
+        *
+        * <p>In cases where the different parallelisms are not multiples of 
each other one or several
+        * downstream operations will have a differing number of inputs from 
upstream operations.
+        *
+        * @return The DataStream with rescale partitioning set.
+        */
+       @Experimental
+       public DataStream<T> rescale() {
+               return setConnectionType(new RescalePartitioner<T>());
+       }
+
+       /**
         * Sets the partitioning of the {@link DataStream} so that the output 
values
         * all go to the first instance of the next processing operator. Use 
this
         * setting with care since it might cause a serious performance 
bottleneck

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6a8b6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index c2fcaaf..0aef7f8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -114,28 +114,40 @@ public class SingleOutputStreamOperator<T, O extends 
SingleOutputStreamOperator<
        }
 
        @SuppressWarnings("unchecked")
+       @Override
        public SingleOutputStreamOperator<T, O> broadcast() {
                return (SingleOutputStreamOperator<T, O>) super.broadcast();
        }
 
        @SuppressWarnings("unchecked")
+       @Override
        @Experimental
        public SingleOutputStreamOperator<T, O> shuffle() {
                return (SingleOutputStreamOperator<T, O>) super.shuffle();
        }
 
        @SuppressWarnings("unchecked")
+       @Override
        @Experimental
        public SingleOutputStreamOperator<T, O> forward() {
                return (SingleOutputStreamOperator<T, O>) super.forward();
        }
 
        @SuppressWarnings("unchecked")
+       @Override
        public SingleOutputStreamOperator<T, O> rebalance() {
                return (SingleOutputStreamOperator<T, O>) super.rebalance();
        }
 
        @SuppressWarnings("unchecked")
+       @Override
+       @Experimental
+       public SingleOutputStreamOperator<T, O> rescale() {
+               return (SingleOutputStreamOperator<T, O>) super.rescale();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
        @Experimental
        public SingleOutputStreamOperator<T, O> global() {
                return (SingleOutputStreamOperator<T, O>) super.global();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6a8b6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5da2caa..fd75ba7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
@@ -361,10 +362,16 @@ public class StreamingJobGraphGenerator {
                StreamPartitioner<?> partitioner = edge.getPartitioner();
                if (partitioner instanceof ForwardPartitioner) {
                        downStreamVertex.connectNewDataSetAsInput(
-                                       headVertex,
-                                       DistributionPattern.POINTWISE,
-                                       ResultPartitionType.PIPELINED,
-                                       true);
+                               headVertex,
+                               DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED,
+                               true);
+               } else if (partitioner instanceof RescalePartitioner){
+                       downStreamVertex.connectNewDataSetAsInput(
+                               headVertex,
+                               DistributionPattern.POINTWISE,
+                               ResultPartitionType.PIPELINED,
+                               true);
                } else {
                        downStreamVertex.connectNewDataSetAsInput(
                                        headVertex,

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6a8b6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
new file mode 100644
index 0000000..063e64a
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that distributes the data equally by cycling through the output
+ * channels. This distributes only to a subset of downstream nodes because
+ * {@link org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator} 
instantiates
+ * a {@link DistributionPattern#POINTWISE} distribution pattern when 
encountering
+ * {@code SemiRebalancePartitioner}.
+ *
+ * <p>The subset of downstream operations to which the upstream operation sends
+ * elements depends on the degree of parallelism of both the upstream and 
downstream operation.
+ * For example, if the upstream operation has parallelism 2 and the downstream 
operation
+ * has parallelism 4, then one upstream operation would distribute elements to 
two
+ * downstream operations while the other upstream operation would distribute 
to the other
+ * two downstream operations. If, on the other hand, the downstream operation 
has parallelism
+ * 2 while the upstream operation has parallelism 4 then two upstream 
operations will
+ * distribute to one downstream operation while the other two upstream 
operations will
+ * distribute to the other downstream operations.
+ *
+ * <p>In cases where the different parallelisms are not multiples of each 
other one or several
+ * downstream operations will have a differing number of inputs from upstream 
operations.
+ *
+ * @param <T> Type of the elements in the Stream being rescaled
+ */
+public class RescalePartitioner<T> extends StreamPartitioner<T> {
+       private static final long serialVersionUID = 1L;
+
+       private int[] returnArray = new int[] {-1};
+
+       @Override
+       public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record, int numberOfOutputChannels) {
+               this.returnArray[0] = (this.returnArray[0] + 1) % 
numberOfOutputChannels;
+               return this.returnArray;
+       }
+       
+       public StreamPartitioner<T> copy() {
+               return this;
+       }
+       
+       @Override
+       public String toString() {
+               return "RESCALE";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6a8b6d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
new file mode 100644
index 0000000..bac7fa5
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+public class RescalePartitionerTest extends TestLogger {
+       
+       private RescalePartitioner<Tuple> distributePartitioner;
+       private StreamRecord<Tuple> streamRecord = new 
StreamRecord<Tuple>(null);
+       private SerializationDelegate<StreamRecord<Tuple>> sd = new 
SerializationDelegate<StreamRecord<Tuple>>(
+                       null);
+       
+       @Before
+       public void setPartitioner() {
+               distributePartitioner = new RescalePartitioner<Tuple>();
+       }
+       
+       @Test
+       public void testSelectChannelsLength() {
+               sd.setInstance(streamRecord);
+               assertEquals(1, distributePartitioner.selectChannels(sd, 
1).length);
+               assertEquals(1, distributePartitioner.selectChannels(sd, 
2).length);
+               assertEquals(1, distributePartitioner.selectChannels(sd, 
1024).length);
+       }
+       
+       @Test
+       public void testSelectChannelsInterval() {
+               sd.setInstance(streamRecord);
+               assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
+               assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
+               assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
+               assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
+       }
+
+       @Test
+       public void testExecutionGraphGeneration() {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               env.setParallelism(4);
+
+               // get input data
+               DataStream<String> text = env.addSource(new 
ParallelSourceFunction<String>() {
+                       @Override
+                       public void run(SourceContext<String> ctx) throws 
Exception {
+
+                       }
+
+                       @Override
+                       public void cancel() {
+
+                       }
+               }).setParallelism(2);
+
+               DataStream<Tuple2<String, Integer>> counts = text
+                       .rescale()
+                       .flatMap(new FlatMapFunction<String, Tuple2<String, 
Integer>>() {
+                               @Override
+                               public void flatMap(String value,
+                                       Collector<Tuple2<String, Integer>> out) 
throws Exception {
+
+                               }
+                       });
+
+               counts.rescale().print().setParallelism(2);
+
+               JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+               final JobID jobId = new JobID();
+               final String jobName = "Semi-Rebalance Test Job";
+               final Configuration cfg = new Configuration();
+
+               List<JobVertex> jobVertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+
+               JobVertex sourceVertex = jobVertices.get(0);
+               JobVertex mapVertex = jobVertices.get(1);
+               JobVertex sinkVertex = jobVertices.get(2);
+
+               assertEquals(2, sourceVertex.getParallelism());
+               assertEquals(4, mapVertex.getParallelism());
+               assertEquals(2, sinkVertex.getParallelism());
+
+               ExecutionGraph eg = new ExecutionGraph(
+                       TestingUtils.defaultExecutionContext(),
+                       jobId,
+                       jobName,
+                       cfg,
+                       AkkaUtils.getDefaultTimeout(),new ArrayList<BlobKey>(), 
new ArrayList<URL>(), ExecutionGraph.class.getClassLoader());
+               try {
+                       eg.attachJobGraph(jobVertices);
+               }
+               catch (JobException e) {
+                       e.printStackTrace();
+                       fail("Building ExecutionGraph failed: " + 
e.getMessage());
+               }
+
+
+               ExecutionJobVertex execSourceVertex = 
eg.getJobVertex(sourceVertex.getID());
+               ExecutionJobVertex execMapVertex= 
eg.getJobVertex(mapVertex.getID());
+               ExecutionJobVertex execSinkVertex= 
eg.getJobVertex(sinkVertex.getID());
+
+               assertEquals(0, execSourceVertex.getInputs().size());
+
+               assertEquals(1, execMapVertex.getInputs().size());
+               assertEquals(4, execMapVertex.getParallelism());
+               ExecutionVertex[] mapTaskVertices = 
execMapVertex.getTaskVertices();
+
+               // verify that we have each parallel input partition exactly 
twice, i.e. that one source
+               // sends to two unique mappers
+               Map<Integer, Integer> mapInputPartitionCounts = new HashMap<>();
+               for (ExecutionVertex mapTaskVertex: mapTaskVertices) {
+                       assertEquals(1, mapTaskVertex.getNumberOfInputs());
+                       assertEquals(1, mapTaskVertex.getInputEdges(0).length);
+                       ExecutionEdge inputEdge = 
mapTaskVertex.getInputEdges(0)[0];
+                       assertEquals(sourceVertex.getID(), 
inputEdge.getSource().getProducer().getJobvertexId());
+                       int inputPartition = 
inputEdge.getSource().getPartitionNumber();
+                       if 
(!mapInputPartitionCounts.containsKey(inputPartition)) {
+                               mapInputPartitionCounts.put(inputPartition, 1);
+                       } else {
+                               mapInputPartitionCounts.put(inputPartition, 
mapInputPartitionCounts.get(inputPartition) + 1);
+                       }
+               }
+
+               assertEquals(2, mapInputPartitionCounts.size());
+               for (int count: mapInputPartitionCounts.values()) {
+                       assertEquals(2, count);
+               }
+
+               assertEquals(1, execSinkVertex.getInputs().size());
+               assertEquals(2, execSinkVertex.getParallelism());
+               ExecutionVertex[] sinkTaskVertices = 
execSinkVertex.getTaskVertices();
+
+               // verify each sink instance has two inputs from the map and 
that each map subpartition
+               // only occurs in one unique input edge
+               Set<Integer> mapSubpartitions = new HashSet<>();
+               for (ExecutionVertex sinkTaskVertex: sinkTaskVertices) {
+                       assertEquals(1, sinkTaskVertex.getNumberOfInputs());
+                       assertEquals(2, sinkTaskVertex.getInputEdges(0).length);
+                       ExecutionEdge inputEdge1 = 
sinkTaskVertex.getInputEdges(0)[0];
+                       ExecutionEdge inputEdge2 = 
sinkTaskVertex.getInputEdges(0)[1];
+                       assertEquals(mapVertex.getID(), 
inputEdge1.getSource().getProducer().getJobvertexId());
+                       assertEquals(mapVertex.getID(), 
inputEdge2.getSource().getProducer().getJobvertexId());
+
+                       int inputPartition1 = 
inputEdge1.getSource().getPartitionNumber();
+                       assertFalse(mapSubpartitions.contains(inputPartition1));
+                       mapSubpartitions.add(inputPartition1);
+                       int inputPartition2 = 
inputEdge2.getSource().getPartitionNumber();
+                       assertFalse(mapSubpartitions.contains(inputPartition2));
+                       mapSubpartitions.add(inputPartition2);
+               }
+
+               assertEquals(4, mapSubpartitions.size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6a8b6d/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index d3bc463..3fe55c4 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -300,7 +300,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * Partitions a tuple DataStream on the specified key fields using a custom 
partitioner.
    * This method takes the key position to partition on, and a partitioner 
that accepts the key
    * type.
-   * <p>
+   *
    * Note: This method works only on single field keys.
    */
   def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: 
Int) : DataStream[T] =
@@ -310,7 +310,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * Partitions a POJO DataStream on the specified key fields using a custom 
partitioner.
    * This method takes the key expression to partition on, and a partitioner 
that accepts the key
    * type.
-   * <p>
+   *
    * Note: This method works only on single field keys.
    */
   def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: 
String)
@@ -320,7 +320,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * Partitions a DataStream on the key returned by the selector, using a 
custom partitioner.
    * This method takes the key selector to get the key to partition on, and a 
partitioner that
    * accepts the key type.
-   * <p>
+   *
    * Note: This method works only on single field keys, i.e. the selector 
cannot return tuples
    * of fields.
    */
@@ -336,10 +336,7 @@ class DataStream[T](stream: JavaStream[T]) {
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
-   * are broad casted to every parallel instance of the next component. This
-   * setting only effects the how the outputs will be distributed between the
-   * parallel instances of the next processing operator.
-   *
+   * are broad casted to every parallel instance of the next component.
    */
   def broadcast: DataStream[T] = stream.broadcast()
 
@@ -353,10 +350,7 @@ class DataStream[T](stream: JavaStream[T]) {
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
-   * are shuffled to the next component. This setting only effects the how the
-   * outputs will be distributed between the parallel instances of the next
-   * processing operator.
-   *
+   * are shuffled to the next component.
    */
   @Experimental
   def shuffle: DataStream[T] = stream.shuffle()
@@ -364,39 +358,53 @@ class DataStream[T](stream: JavaStream[T]) {
   /**
    * Sets the partitioning of the DataStream so that the output tuples
    * are forwarded to the local subtask of the next component (whenever
-   * possible). This is the default partitioner setting. This setting only
-   * effects the how the outputs will be distributed between the parallel
-   * instances of the next processing operator.
-   *
+   * possible).
    */
   @Experimental
   def forward: DataStream[T] = stream.forward()
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
-   * are distributed evenly to the next component.This setting only effects
-   * the how the outputs will be distributed between the parallel instances of
-   * the next processing operator.
-   *
+   * are distributed evenly to the next component.
    */
   def rebalance: DataStream[T] = stream.rebalance()
 
   /**
+   * Sets the partitioning of the [[DataStream]] so that the output tuples
+   * are distributed evenly to a subset of instances of the downstream 
operation.
+   *
+   * The subset of downstream operations to which the upstream operation sends
+   * elements depends on the degree of parallelism of both the upstream and 
downstream operation.
+   * For example, if the upstream operation has parallelism 2 and the 
downstream operation
+   * has parallelism 4, then one upstream operation would distribute elements 
to two
+   * downstream operations while the other upstream operation would distribute 
to the other
+   * two downstream operations. If, on the other hand, the downstream 
operation has parallelism
+   * 2 while the upstream operation has parallelism 4 then two upstream 
operations will
+   * distribute to one downstream operation while the other two upstream 
operations will
+   * distribute to the other downstream operations.
+   *
+   * In cases where the different parallelisms are not multiples of each other 
one or several
+   * downstream operations will have a differing number of inputs from 
upstream operations.
+   */
+  @Experimental
+  def rescale: DataStream[T] = stream.rescale()
+
+  /**
    * Initiates an iterative part of the program that creates a loop by feeding
    * back data streams. To create a streaming iteration the user needs to 
define
    * a transformation that creates two DataStreams. The first one is the output
    * that will be fed back to the start of the iteration and the second is the 
output
    * stream of the iterative part.
-   * <p>
+   *
    * stepfunction: initialStream => (feedback, output)
-   * <p>
+   *
    * A common pattern is to use output splitting to create feedback and output 
DataStream.
    * Please refer to the .split(...) method of the DataStream
-   * <p>
+   *
    * By default a DataStream with iteration will never terminate, but the user
    * can use the maxWaitTime parameter to set a max waiting time for the 
iteration head.
    * If no data received in the set time the stream terminates.
-   * <p>
+   *
    * By default the feedback partitioning is set to match the input, to 
override this set
    * the keepPartitioning flag to true
    *
@@ -424,9 +432,8 @@ class DataStream[T](stream: JavaStream[T]) {
    *
    * This allows the user to distinguish standard input from feedback inputs.
    *
-   * <p>
    * stepfunction: initialStream => (feedback, output)
-   * <p>
+   *
    * The user must set the max waiting time for the iteration head.
    * If no data received in the set time the stream terminates. If this 
parameter is set
    * to 0 then the iteration sources will indefinitely, so the job must be 
killed to stop.

Reply via email to