I am afraid that in the scenario that I am trying to deploy the QEP it
does not work. Let me explain.

I have 4 machines with 8 cores each. I want to set the parallelism for
all operators to 16. My QEP is:
source(1)->map(16)->flatmap(16)-keyBy->reduce(16)

So I would like to have 8 maps and 8 flatmaps in the first machine, 8
maps and 8 flatmaps in the second machine, 8 reducers in the third
machine, and 8 reducers in the fourth machine.

When I set taskmanager.numberOfTaskSlots = 3 I cannot have as many
parallelism as I want (16). Because Each slot runs one parallel
pipeline (from flink-conf.yaml). So I need 8 slots in each TM.
When I use one slotSharingGroup for source, map, and flatmap, and
other slotSharingGroup for the reducer, and parallelism of 16, somehow
Grafana is showing to me more than 16 parallel instances of the
operators.

\Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Wed, Jun 3, 2020 at 2:42 PM Weihua Hu <huweihua....@gmail.com> wrote:
>
> Hi, Felipe
>
> sorry for late reply.
> You can try to config taskmanager.numberOfTaskSlots = 1 and use different 
> slotSharingGroup to make sure Task do not placed in same TM.
>
> Best
> Weihua Hu
>
> 2020年5月29日 17:07,Felipe Gutierrez <felipe.o.gutier...@gmail.com> 写道:
>
> Using slotSharingGroup I can do some placement. however, I am using
> two different slotSharingGroup for two different sources, even though
> they are placed in the same TM. And this starts splitting the
> downstream operators in different TM as well.
>
> stream01 = source01.slot1 -> map01(4).slot1 -> flatmap01(4).slot1 \
> stream02 = source02.slot2 -> map02(4).slot2 -> flatmap02(4).slot2 /
>  |-> stream01.union(stream02) -> keyBy -> reducer(8).slot3
>
> I am not sure which configuration I can adjust in the
> conf/flink-conf.yaml file to make it works. Currently, my
> configuration is like this bellow on the four TMs.
>
> taskmanager.numberOfTaskSlots: 4
> parallelism.default: 4
>
> Maybe if I use different numberOfTaskSlots on different TMs would it work?
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, May 29, 2020 at 9:00 AM Felipe Gutierrez
> <felipe.o.gutier...@gmail.com> wrote:
>
>
> because I am measuring one operator (all instances) and I want to
> place its downstream operators in another machine in order to use
> network channels.
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, May 29, 2020 at 4:59 AM Weihua Hu <huweihua....@gmail.com> wrote:
>
>
> Hi, Felipe
>
> Flink does not support run tasks on specified TM.
> You can use slotSharingGroup to control Tasks not in same Slot, but cannot 
> specified which TM.
>
> Can you please give the reason for specifying TM?
>
>
> Best
> Weihua Hu
>
> 2020年5月28日 21:37,Felipe Gutierrez <felipe.o.gutier...@gmail.com> 写道:
>
> For instance, if I have the following DAG with the respect parallelism
> in parenthesis (I hope the dag appears real afterall):
>
> source01 -> map01(4) -> flatmap01(4) \
>
> |-> keyBy -> reducer(8)
> source02 -> map02(4) -> flatmap02(4) /
>
> And I have 4 TMs in 4 machines with 4 cores each. I would like to
> place source01 and map01 and flatmap01 in TM-01. source02 and map02
> and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
> operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
> in TM-04.
>
> I am using the methods "setParallelism()" and "slotSharingGroup()" to
> define it but both source01 and source02 are placed in TM-01 and map01
> is split into 2 TMs. The same with map02.
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
>
>

Reply via email to