Re: Better way to share large data across task managers

2020-09-25 Thread Kostas Kloudas
Hi Dongwon,

Yes, you are right that I assume that broadcasting occurs once. This
is what I meant by "If you know the data in advance". Sorry for not
being clear. If you need to periodically broadcast new versions of the
data, then I cannot find a better solution than the one you propose
with the static var.

Cheers,
Kostas

On Wed, Sep 23, 2020 at 11:49 AM Dongwon Kim  wrote:
>
> Hi Kostas,
>
> Thanks for the input!
>
> BTW, I guess you assume that the broadcasting occurs just once for
> bootstrapping, huh?
> My job needs not only bootstrapping but also periodically fetching a
> new version of data from some external storage.
>
> Thanks,
>
> Dongwon
>
> > 2020. 9. 23. 오전 4:59, Kostas Kloudas  작성:
> >
> > Hi Dongwon,
>
>
>
>
>
> >
> > If you know the data in advance, you can always use the Yarn options
> > in [1] (e.g. the "yarn.ship-directories") to ship the directories with
> > the data you want only once to each Yarn container (i.e. TM) and then
> > write a udf which reads them in the open() method. This will allow the
> > data to be shipped only once per TM but then each of the tasks will
> > have its own copy in memory of course. By default the visibility of
> > the files that you ship is set to APPLICATION [2], if I am not
> > mistaken so if more than one TMs go to the same node, then you will
> > have even less copies shipped.
> >
> > Does this help with your usecase?
> >
> > Cheers,
> > Kostas
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
> > [2] 
> > https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html
> >
> >> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim  wrote:
> >> Hi,
> >> I'm using Flink broadcast state similar to what Fabian explained in [1]. 
> >> One difference might be the size of the broadcasted data; the size is 
> >> around 150MB.
> >> I've launched 32 TMs by setting
> >> - taskmanager.numberOfTaskSlots : 6
> >> - parallelism of the non-broadcast side : 192
> >> Here's some questions:
> >> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it 
> >> right?
> >> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in 
> >> each TM can read the broadcasted data? I'm considering implementing a 
> >> static class for the non-broadcast side to directly load data only once on 
> >> each TaskManager instead of the broadcast state (FYI, I'm using per-job 
> >> clusters on YARN, so each TM is only for a single job). However, I'd like 
> >> to use Flink native facilities if possible.
> >> The type of broadcasted data is Map with around 600K entries, 
> >> so every time the data is broadcasted a lot of GC is inevitable on each TM 
> >> due to the (de)serialization cost.
> >> Any advice would be much appreciated.
> >> Best,
> >> Dongwon
> >> [1] https://flink.apache.org/2019/06/26/broadcast-state.html


Re: Better way to share large data across task managers

2020-09-23 Thread Dongwon Kim
Hi Kostas,

Thanks for the input!

BTW, I guess you assume that the broadcasting occurs just once for
bootstrapping, huh?
My job needs not only bootstrapping but also periodically fetching a
new version of data from some external storage.

Thanks,

Dongwon

> 2020. 9. 23. 오전 4:59, Kostas Kloudas  작성:
>
> Hi Dongwon,





>
> If you know the data in advance, you can always use the Yarn options
> in [1] (e.g. the "yarn.ship-directories") to ship the directories with
> the data you want only once to each Yarn container (i.e. TM) and then
> write a udf which reads them in the open() method. This will allow the
> data to be shipped only once per TM but then each of the tasks will
> have its own copy in memory of course. By default the visibility of
> the files that you ship is set to APPLICATION [2], if I am not
> mistaken so if more than one TMs go to the same node, then you will
> have even less copies shipped.
>
> Does this help with your usecase?
>
> Cheers,
> Kostas
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
> [2] 
> https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html
>
>> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim  wrote:
>> Hi,
>> I'm using Flink broadcast state similar to what Fabian explained in [1]. One 
>> difference might be the size of the broadcasted data; the size is around 
>> 150MB.
>> I've launched 32 TMs by setting
>> - taskmanager.numberOfTaskSlots : 6
>> - parallelism of the non-broadcast side : 192
>> Here's some questions:
>> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
>> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in 
>> each TM can read the broadcasted data? I'm considering implementing a static 
>> class for the non-broadcast side to directly load data only once on each 
>> TaskManager instead of the broadcast state (FYI, I'm using per-job clusters 
>> on YARN, so each TM is only for a single job). However, I'd like to use 
>> Flink native facilities if possible.
>> The type of broadcasted data is Map with around 600K entries, so 
>> every time the data is broadcasted a lot of GC is inevitable on each TM due 
>> to the (de)serialization cost.
>> Any advice would be much appreciated.
>> Best,
>> Dongwon
>> [1] https://flink.apache.org/2019/06/26/broadcast-state.html


Re: Better way to share large data across task managers

2020-09-22 Thread Kostas Kloudas
Hi Dongwon,

If you know the data in advance, you can always use the Yarn options
in [1] (e.g. the "yarn.ship-directories") to ship the directories with
the data you want only once to each Yarn container (i.e. TM) and then
write a udf which reads them in the open() method. This will allow the
data to be shipped only once per TM but then each of the tasks will
have its own copy in memory of course. By default the visibility of
the files that you ship is set to APPLICATION [2], if I am not
mistaken so if more than one TMs go to the same node, then you will
have even less copies shipped.

Does this help with your usecase?

Cheers,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
[2] 
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html

On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim  wrote:
>
> Hi,
>
> I'm using Flink broadcast state similar to what Fabian explained in [1]. One 
> difference might be the size of the broadcasted data; the size is around 
> 150MB.
>
> I've launched 32 TMs by setting
> - taskmanager.numberOfTaskSlots : 6
> - parallelism of the non-broadcast side : 192
>
> Here's some questions:
> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in 
> each TM can read the broadcasted data? I'm considering implementing a static 
> class for the non-broadcast side to directly load data only once on each 
> TaskManager instead of the broadcast state (FYI, I'm using per-job clusters 
> on YARN, so each TM is only for a single job). However, I'd like to use Flink 
> native facilities if possible.
>
> The type of broadcasted data is Map with around 600K entries, so 
> every time the data is broadcasted a lot of GC is inevitable on each TM due 
> to the (de)serialization cost.
>
> Any advice would be much appreciated.
>
> Best,
>
> Dongwon
>
> [1] https://flink.apache.org/2019/06/26/broadcast-state.html


Better way to share large data across task managers

2020-09-20 Thread Dongwon Kim
Hi,

I'm using Flink broadcast state similar to what Fabian explained in [1].
One difference might be the size of the broadcasted data; the size is
around 150MB.

I've launched 32 TMs by setting
- taskmanager.numberOfTaskSlots : 6
- parallelism of the non-broadcast side : 192

Here's some questions:
1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it
right?
2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in
each TM can read the broadcasted data? I'm considering implementing a
static class for the non-broadcast side to directly load data only once on
each TaskManager instead of the broadcast state (FYI, I'm using per-job
clusters on YARN, so each TM is only for a single job). However, I'd like
to use Flink native facilities if possible.

The type of broadcasted data is Map with around 600K entries, so
every time the data is broadcasted a lot of GC is inevitable on each TM due
to the (de)serialization cost.

Any advice would be much appreciated.

Best,

Dongwon

[1] https://flink.apache.org/2019/06/26/broadcast-state.html