I’m not sure I understand the actual use case, but …

Using a rebalance() to randomly distribute keys to operators is what I think 
you’d need to do to support “even if I have less keys that slots, I wants each 
slot to take his share in the work”

So it sounds like you want to (a) broadcast all rules (so every operator task 
has all of the rules), and then (b) randomly distribute the keys to the 

Then have a custom function that examines the keys to figure out what rule(s) 
to apply.

There are often timing issues here, where you have to buffer keys while waiting 
for all (to some definition of “all”) the rules to arrive before you start 
processing the keys.

— Ken

> On Apr 12, 2018, at 2:44 AM, Christophe Jolif <cjo...@gmail.com> wrote:
> Sihua,
> On Thu, Apr 12, 2018 at 10:04 AM, 周思华 <summerle...@163.com 
> <mailto:summerle...@163.com>> wrote:
> Hi Christophe,
> I think what you want to do is "stream join", and I'm a bit confuse that if 
> you have know there are only 8 keys   then why would you still like to use 16 
> parallelisms? 8 of them will be idle(a waste of CPU). In the KeyedStream, the 
> tuples with the same key will be sent to the same parrallelism. 
> First my 8 keys, 16 parallelisms is just an example. Real life it is a bit 
> more complicated. But basically the idea is that I have a certain number of 
> task slots, and I want to get them busy so that my processing is as fast as 
> possible. Even if I have less keys that slots, I wants each slot to take his 
> share in the work.
> And I'm also a bit confuse about the pseudo code, it looks like you regard 
> that the tuple with the same key in stream A will always arrive before the 
> tuple in stream B? I think that can't be promised... you may need to store 
> the tuple in stream B in case that tuple in stream B arrive before A, and do 
> the "analysis logic" in both flatMap1() and flatMap2().
> You are right. I just wanted to focus on my issue which is :
> 1/ having a co-processing that is considering only stuff of the same key and 
> that can store in the key-state the "rules" (and as you said I might have to 
> store other things for ordering reasons)
> 2/ but being able to parallelism a given key to use as much parallelism as my 
> cluster allow me to do so.
> Regards,
> Sihua Zhou
> On 04/12/2018 15:44,Christophe Jolif<cjo...@gmail.com> 
> <mailto:cjo...@gmail.com> wrote:
> Thanks Chesnay (and others).
> That's what I was figuring out. Now let's go onto the follow up with my exact 
> use-case.
> I have two streams A and B. A basically receives "rules" that the processing 
> of B should observe to process.
> There is a "key" that allows me to know that a rule x coming in A is for 
> events with the same key coming in B.
> I was planning to do (pseudo code):
> A.connect(B).keyBy("thekey").flatMap(
>    flatMap1()
>       -> store in a ValueState the rule 
>    flatMap2()
>       -> use the state to get the rule, transform the element according to 
> the rule, collect it
> )
> I think it should work, right, because the ValueState will be "per key" and 
> contain the rule for this key and so on?
> Now, what I really care is not having all the elements of key1 in the same 
> parallelism, I just want to make sure key1 and key2 are isolated so I can use 
> the key state to store the corresponding rule and key2 rules are not used for 
> key1 and conversely.
> So ideally instead of using 8 parallelisms, in order to use the full power of 
> my system, even with 8 keys I would like to use 16 parallelisms as I don't 
> care about all elements of key1 being in the same parallelism. All I care is 
> that the state contain the rule corresponding to this key.
> What would be the recommended approach here?
> Thanks again for your help,
> --
> Christophe
> On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler <ches...@apache.org 
> <mailto:ches...@apache.org>> wrote:
> You will get 16 parallel executions since you specify a parallellism of 16, 
> however 8 of these will not get any data.
> On 11.04.2018 23:29, Hao Sun wrote:
>> From what I learnt, you have to control parallelism your self. You can set 
>> parallelism on operator or set default one through flink-config.yaml.
>> I might be wrong.
>> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif <cjo...@gmail.com 
>> <mailto:cjo...@gmail.com>> wrote:
>> Hi all,
>> Imagine I have a default parallelism of 16 and I do something like
>> stream.keyBy("something").flatMap()
>> Now let's imagine I have less than 16 keys, maybe 8.
>> How many parallel executions of the flatMap function will I get? 8 because I 
>> have 8 keys, or 16 because I have default parallelism at 16?
>> (and I will have follow up questions depending on the answer I suspect ;))
>> Thanks,
>> -- 
>> Christophe

+1 530-210-6378

Reply via email to