Re: keyBy and parallelism

2018-04-12 Thread Ken Krugler
I’m not sure I understand the actual use case, but …

Using a rebalance() to randomly distribute keys to operators is what I think 
you’d need to do to support “even if I have less keys that slots, I wants each 
slot to take his share in the work”

So it sounds like you want to (a) broadcast all rules (so every operator task 
has all of the rules), and then (b) randomly distribute the keys to the 
operator.

Then have a custom function that examines the keys to figure out what rule(s) 
to apply.

There are often timing issues here, where you have to buffer keys while waiting 
for all (to some definition of “all”) the rules to arrive before you start 
processing the keys.

— Ken

> On Apr 12, 2018, at 2:44 AM, Christophe Jolif  wrote:
> 
> Sihua,
> 
> On Thu, Apr 12, 2018 at 10:04 AM, 周思华  > wrote:
> Hi Christophe,
> I think what you want to do is "stream join", and I'm a bit confuse that if 
> you have know there are only 8 keys   then why would you still like to use 16 
> parallelisms? 8 of them will be idle(a waste of CPU). In the KeyedStream, the 
> tuples with the same key will be sent to the same parrallelism. 
> 
> 
> First my 8 keys, 16 parallelisms is just an example. Real life it is a bit 
> more complicated. But basically the idea is that I have a certain number of 
> task slots, and I want to get them busy so that my processing is as fast as 
> possible. Even if I have less keys that slots, I wants each slot to take his 
> share in the work.
>  
> 
> And I'm also a bit confuse about the pseudo code, it looks like you regard 
> that the tuple with the same key in stream A will always arrive before the 
> tuple in stream B? I think that can't be promised... you may need to store 
> the tuple in stream B in case that tuple in stream B arrive before A, and do 
> the "analysis logic" in both flatMap1() and flatMap2().
> 
> 
> You are right. I just wanted to focus on my issue which is :
> 
> 1/ having a co-processing that is considering only stuff of the same key and 
> that can store in the key-state the "rules" (and as you said I might have to 
> store other things for ordering reasons)
> 2/ but being able to parallelism a given key to use as much parallelism as my 
> cluster allow me to do so.
> 
> 
> Regards,
> Sihua Zhou
> 
> On 04/12/2018 15:44,Christophe Jolif 
>  wrote:
> Thanks Chesnay (and others).
> 
> That's what I was figuring out. Now let's go onto the follow up with my exact 
> use-case.
> 
> I have two streams A and B. A basically receives "rules" that the processing 
> of B should observe to process.
> 
> There is a "key" that allows me to know that a rule x coming in A is for 
> events with the same key coming in B.
> 
> I was planning to do (pseudo code):
> 
> A.connect(B).keyBy("thekey").flatMap(
>flatMap1()
>   -> store in a ValueState the rule 
>flatMap2()
>   -> use the state to get the rule, transform the element according to 
> the rule, collect it
> )
> 
> 
> I think it should work, right, because the ValueState will be "per key" and 
> contain the rule for this key and so on?
> 
> Now, what I really care is not having all the elements of key1 in the same 
> parallelism, I just want to make sure key1 and key2 are isolated so I can use 
> the key state to store the corresponding rule and key2 rules are not used for 
> key1 and conversely.
> 
> So ideally instead of using 8 parallelisms, in order to use the full power of 
> my system, even with 8 keys I would like to use 16 parallelisms as I don't 
> care about all elements of key1 being in the same parallelism. All I care is 
> that the state contain the rule corresponding to this key.
> 
> What would be the recommended approach here?
> 
> Thanks again for your help,
> --
> Christophe
> 
> 
> On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler  > wrote:
> You will get 16 parallel executions since you specify a parallellism of 16, 
> however 8 of these will not get any data.
> 
> 
> On 11.04.2018 23:29, Hao Sun wrote:
>> From what I learnt, you have to control parallelism your self. You can set 
>> parallelism on operator or set default one through flink-config.yaml.
>> I might be wrong.
>> 
>> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif > > wrote:
>> Hi all,
>> 
>> Imagine I have a default parallelism of 16 and I do something like
>> 
>> stream.keyBy("something").flatMap()
>> 
>> Now let's imagine I have less than 16 keys, maybe 8.
>> 
>> How many parallel executions of the flatMap function will I get? 8 because I 
>> have 8 keys, or 16 because I have default parallelism at 16?
>> 
>> (and I will have follow up questions depending on the answer I suspect ;))
>> 
>> Thanks,
>> -- 
>> Christophe
> 
> 
> 
> 


http://about.me/kkrugler
+1 530-210-6378



Re: keyBy and parallelism

2018-04-12 Thread Christophe Jolif
Sihua,

On Thu, Apr 12, 2018 at 10:04 AM, 周思华  wrote:

> Hi Christophe,
> I think what you want to do is "stream join", and I'm a bit confuse that
> if you have know there are only 8 keys   then why would you still like to
> use 16 parallelisms? 8 of them will be idle(a waste of CPU). In the
> KeyedStream, the tuples with the same key will be sent to the same
> parrallelism.
>


First my 8 keys, 16 parallelisms is just an example. Real life it is a bit
more complicated. But basically the idea is that I have a certain number of
task slots, and I want to get them busy so that my processing is as fast as
possible. Even if I have less keys that slots, I wants each slot to take
his share in the work.


>
> And I'm also a bit confuse about the pseudo code, it looks like you regard
> that the tuple with the same key in stream A will always arrive before the
> tuple in stream B? I think that can't be promised... you may need to store
> the tuple in stream B in case that tuple in stream B arrive before A, and
> do the "analysis logic" in both flatMap1() and flatMap2().
>


You are right. I just wanted to focus on my issue which is :

1/ having a co-processing that is considering only stuff of the same key
and that can store in the key-state the "rules" (and as you said I might
have to store other things for ordering reasons)
2/ but being able to parallelism a given key to use as much parallelism as
my cluster allow me to do so.


Regards,
> Sihua Zhou
>
> On 04/12/2018 15:44,Christophe Jolif 
> wrote:
>
> Thanks Chesnay (and others).
>
> That's what I was figuring out. Now let's go onto the follow up with my
> exact use-case.
>
> I have two streams A and B. A basically receives "rules" that the
> processing of B should observe to process.
>
> There is a "key" that allows me to know that a rule x coming in A is for
> events with the same key coming in B.
>
> I was planning to do (pseudo code):
>
> A.connect(B).keyBy("thekey").flatMap(
>flatMap1()
>   -> store in a ValueState the rule
>flatMap2()
>   -> use the state to get the rule, transform the element according to
> the rule, collect it
> )
>
>
> I think it should work, right, because the ValueState will be "per key"
> and contain the rule for this key and so on?
>
> Now, what I really care is not having all the elements of key1 in the same
> parallelism, I just want to make sure key1 and key2 are isolated so I can
> use the key state to store the corresponding rule and key2 rules are not
> used for key1 and conversely.
>
> So ideally instead of using 8 parallelisms, in order to use the full
> power of my system, even with 8 keys I would like to use 16 parallelisms as
> I don't care about all elements of key1 being in the same parallelism. All
> I care is that the state contain the rule corresponding to this key.
>
> What would be the recommended approach here?
>
> Thanks again for your help,
> --
> Christophe
>
>
> On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler 
> wrote:
>
>> You will get 16 parallel executions since you specify a parallellism of
>> 16, however 8 of these will not get any data.
>>
>>
>> On 11.04.2018 23:29, Hao Sun wrote:
>>
>> From what I learnt, you have to control parallelism your self. You can
>> set parallelism on operator or set default one through flink-config.yaml.
>> I might be wrong.
>>
>> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif 
>> wrote:
>>
>>> Hi all,
>>>
>>> Imagine I have a default parallelism of 16 and I do something like
>>>
>>> stream.keyBy("something").flatMap()
>>>
>>> Now let's imagine I have less than 16 keys, maybe 8.
>>>
>>> How many parallel executions of the flatMap function will I get? 8
>>> because I have 8 keys, or 16 because I have default parallelism at 16?
>>>
>>> (and I will have follow up questions depending on the answer I suspect
>>> ;))
>>>
>>> Thanks,
>>> --
>>> Christophe
>>>
>>
>>


Re: keyBy and parallelism

2018-04-12 Thread 周思华
Hi Christophe,
I think what you want to do is "stream join", and I'm a bit confuse that if you 
have know there are only 8 keys   then why would you still like to use 16 
parallelisms? 8 of them will be idle(a waste of CPU). In the KeyedStream, the 
tuples with the same key will be sent to the same parrallelism. 


And I'm also a bit confuse about the pseudo code, it looks like you regard that 
the tuple with the same key in stream A will always arrive before the tuple in 
stream B? I think that can't be promised... you may need to store the tuple in 
stream B in case that tuple in stream B arrive before A, and do the "analysis 
logic" in both flatMap1() and flatMap2().


Regards,
Sihua Zhou


On 04/12/2018 15:44,Christophe Jolif wrote:
Thanks Chesnay (and others).


That's what I was figuring out. Now let's go onto the follow up with my exact 
use-case.


I have two streams A and B. A basically receives "rules" that the processing of 
B should observe to process.


There is a "key" that allows me to know that a rule x coming in A is for events 
with the same key coming in B.


I was planning to do (pseudo code):


A.connect(B).keyBy("thekey").flatMap(
   flatMap1()
  -> store in a ValueState the rule 
   flatMap2()
  -> use the state to get the rule, transform the element according to the 
rule, collect it
)




I think it should work, right, because the ValueState will be "per key" and 
contain the rule for this key and so on?


Now, what I really care is not having all the elements of key1 in the same 
parallelism, I just want to make sure key1 and key2 are isolated so I can use 
the key state to store the corresponding rule and key2 rules are not used for 
key1 and conversely.


So ideally instead of using 8 parallelisms, in order to use the full power of 
my system, even with 8 keys I would like to use 16 parallelisms as I don't care 
about all elements of key1 being in the same parallelism. All I care is that 
the state contain the rule corresponding to this key.


What would be the recommended approach here?


Thanks again for your help,
--
Christophe




On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler  wrote:

You will get 16 parallel executions since you specify a parallellism of 16, 
however 8 of these will not get any data.


On 11.04.2018 23:29, Hao Sun wrote:

From what I learnt, you have to control parallelism your self. You can set 
parallelism on operator or set default one through flink-config.yaml.
I might be wrong.


On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif  wrote:

Hi all,


Imagine I have a default parallelism of 16 and I do something like


stream.keyBy("something").flatMap()


Now let's imagine I have less than 16 keys, maybe 8.


How many parallel executions of the flatMap function will I get? 8 because I 
have 8 keys, or 16 because I have default parallelism at 16?


(and I will have follow up questions depending on the answer I suspect ;))


Thanks,
--

Christophe




Re: keyBy and parallelism

2018-04-12 Thread Christophe Jolif
Thanks Chesnay (and others).

That's what I was figuring out. Now let's go onto the follow up with my
exact use-case.

I have two streams A and B. A basically receives "rules" that the
processing of B should observe to process.

There is a "key" that allows me to know that a rule x coming in A is for
events with the same key coming in B.

I was planning to do (pseudo code):

A.connect(B).keyBy("thekey").flatMap(
   flatMap1()
  -> store in a ValueState the rule
   flatMap2()
  -> use the state to get the rule, transform the element according to
the rule, collect it
)


I think it should work, right, because the ValueState will be "per key" and
contain the rule for this key and so on?

Now, what I really care is not having all the elements of key1 in the same
parallelism, I just want to make sure key1 and key2 are isolated so I can
use the key state to store the corresponding rule and key2 rules are not
used for key1 and conversely.

So ideally instead of using 8 parallelisms, in order to use the full power
of my system, even with 8 keys I would like to use 16 parallelisms as I
don't care about all elements of key1 being in the same parallelism. All I
care is that the state contain the rule corresponding to this key.

What would be the recommended approach here?

Thanks again for your help,
--
Christophe


On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler 
wrote:

> You will get 16 parallel executions since you specify a parallellism of
> 16, however 8 of these will not get any data.
>
>
> On 11.04.2018 23:29, Hao Sun wrote:
>
> From what I learnt, you have to control parallelism your self. You can set
> parallelism on operator or set default one through flink-config.yaml.
> I might be wrong.
>
> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif  wrote:
>
>> Hi all,
>>
>> Imagine I have a default parallelism of 16 and I do something like
>>
>> stream.keyBy("something").flatMap()
>>
>> Now let's imagine I have less than 16 keys, maybe 8.
>>
>> How many parallel executions of the flatMap function will I get? 8
>> because I have 8 keys, or 16 because I have default parallelism at 16?
>>
>> (and I will have follow up questions depending on the answer I suspect ;))
>>
>> Thanks,
>> --
>> Christophe
>>
>
>


Re: keyBy and parallelism

2018-04-12 Thread Chesnay Schepler
You will get 16 parallel executions since you specify a parallellism of 
16, however 8 of these will not get any data.


On 11.04.2018 23:29, Hao Sun wrote:
From what I learnt, you have to control parallelism your self. You can 
set parallelism on operator or set default one through flink-config.yaml.

I might be wrong.

On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif > wrote:


Hi all,

Imagine I have a default parallelism of 16 and I do something like

stream.keyBy("something").flatMap()

Now let's imagine I have less than 16 keys, maybe 8.

How many parallel executions of the flatMap function will I get? 8
because I have 8 keys, or 16 because I have default parallelism at 16?

(and I will have follow up questions depending on the answer I
suspect ;))

Thanks,
-- 
Christophe






Re: keyBy and parallelism

2018-04-11 Thread Hao Sun
>From what I learnt, you have to control parallelism your self. You can set
parallelism on operator or set default one through flink-config.yaml.
I might be wrong.

On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif  wrote:

> Hi all,
>
> Imagine I have a default parallelism of 16 and I do something like
>
> stream.keyBy("something").flatMap()
>
> Now let's imagine I have less than 16 keys, maybe 8.
>
> How many parallel executions of the flatMap function will I get? 8 because
> I have 8 keys, or 16 because I have default parallelism at 16?
>
> (and I will have follow up questions depending on the answer I suspect ;))
>
> Thanks,
> --
> Christophe
>


keyBy and parallelism

2018-04-11 Thread Christophe Jolif
Hi all,

Imagine I have a default parallelism of 16 and I do something like

stream.keyBy("something").flatMap()

Now let's imagine I have less than 16 keys, maybe 8.

How many parallel executions of the flatMap function will I get? 8 because
I have 8 keys, or 16 because I have default parallelism at 16?

(and I will have follow up questions depending on the answer I suspect ;))

Thanks,
-- 
Christophe