Thanks Aljoscha and Timo for your answers.  I will try to digest the pointers 
you provided.
Mans 

    On Wednesday, January 3, 2018 3:01 AM, Aljoscha Krettek 
<aljos...@apache.org> wrote:
 

 Hi,
The answer is correct but I'll try and elaborate a bit: the way data is sent to 
downstream operations depends on a couple of things in this case:
 - parallelism of first input operation - parallelism of second input operation 
- parallelism of co-operation - transmission pattern on first input (broadcast, 
rebalance, etc.) - transmission pattern on second input
Note that there is no parallelism on "streams" since there are technically no 
streams but only operations that are interconnected in a certain way.
Now, if the input parallelism and the operation parallelism are the same and 
you don't specify a transmission pattern then data will not be "shuffled" 
between the operations. If you specify broadcast or rebalance then you will get 
that, i.e. for broadcast an element from the input operator will be sent to 
every instance on the downstream operation.
Best,Aljoscha

On 3. Jan 2018, at 10:43, Timo Walther <twal...@apache.org> wrote:
 
 Hi Mans,
 
 I did a quick test on my PC where I simply set breakpoints in map1 and map2 
(someStream has parallelism 1, otherStream 5, my CoMapFunction 8). Elements of 
someStream end up in different CoMapTasks (2/8, 7/8 etc.).
 
 So I guess the distribution is a round robin partioning. @Aljoscha might know 
more about the internals?
 
 Regards,
 Timo
 
 
 
 Am 12/31/17 um 10:38 PM schrieb M Singh:
  
  Hi: 
  Referring to 
documentation(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html)
 for ConnectedStreams: 
  "Connects" two data streams retaining their types. Connect allowing for 
shared state between the two streams.  DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = 
someStream.connect(otherStream);  
  If the two connected streams have different number of partitions, eg 
(someStream has 4 and otherStream has 2), then how do the elements of the 
stream get distributed for the CoMapFunction: 
   connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});  
  I believe that that if the second stream is broadcast, then each partition of 
the first will get all the elements of the second.  Is my understanding correct 
? 
  If the streams are not broadcast and since the first stream has 4 partitions 
and second one had 2, then how are the elements of the second stream 
distributed to each partition of the first ? 
  Also, if the streams are not broadcasted but have same number of partitions, 
how are the elements distributed ? 
  Thanks 
  Mans 
  
  
  
   

  


   

Reply via email to