Re: HDFS data locality and distribution, Flink

2018-03-19 Thread Reinier Kip
Hi Aljoscha,


Thanks for responding.


I managed to resolve the problem last Friday; I had a single datasource for 
each file, instead of one big datasource for all the files. The reading of the 
one or two HDFS blocks within each datasource was then distributed to a small 
percentage of slots (let's say ~10%). Some Beam runner-specific knowledge for 
Flink I did not yet have.


Using a single TextIO for all the files allowed Flink to use all available 
parallelism. It did require some hoops to jump through, as there was metadata 
associated with each file, each PCollection, which was harder to match to each 
file in one big PCollection.


Reinier



From: Aljoscha Krettek <aljos...@apache.org>
Sent: 13 March 2018 18:29:52
To: user@beam.apache.org
Subject: Re: HDFS data locality and distribution, Flink

Hi,

There should be no data-locality awareness with Beam on Flink because there are 
no APIs in Beam that Flink could use to schedule tasks with awareness. It seems 
it just happens that the readers are distributed as they are.

Are the files roughly of equal size?

Best,
Aljoscha

On 12. Mar 2018, at 05:50, Reinier Kip <r...@bol.com<mailto:r...@bol.com>> 
wrote:

Relevant versions: Beam 2.1, Flink 1.3.

From: Reinier Kip <r...@bol.com<mailto:r...@bol.com>>
Sent: 12 March 2018 13:46:24
To: user@beam.apache.org<mailto:user@beam.apache.org>
Subject: HDFS data locality and distribution, Flink


Hey all,

I'm trying to batch-process 30-ish files from HDFS, but I see that data is 
distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, 
another 3 slots get about 1/5th and a last slot just a few records. This 
probably triggers disk spillover on these slots and slows down the job 
immensely. The data has many many unique keys and processing could be done in a 
highly parallel manner. From what I understand, HDFS data locality governs 
which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their 
millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to 
have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task 
managers.


Does the statement of input split assignment ring true? Is the fact that data 
isn't redistributed an effort from Flink to have high data locality, even if 
this means disk spillover for a few slots/tms and idleness for others? Is there 
any use for parallelism if work isn't distributed anyway?

Thanks for your time, Reinier



Re: HDFS data locality and distribution, Flink

2018-03-13 Thread Aljoscha Krettek
Hi,

There should be no data-locality awareness with Beam on Flink because there are 
no APIs in Beam that Flink could use to schedule tasks with awareness. It seems 
it just happens that the readers are distributed as they are.

Are the files roughly of equal size?

Best,
Aljoscha

> On 12. Mar 2018, at 05:50, Reinier Kip <r...@bol.com> wrote:
> 
> Relevant versions: Beam 2.1, Flink 1.3.
> From: Reinier Kip <r...@bol.com>
> Sent: 12 March 2018 13:46:24
> To: user@beam.apache.org
> Subject: HDFS data locality and distribution, Flink
>  
> Hey all,
> 
> I'm trying to batch-process 30-ish files from HDFS, but I see that data is 
> distributed very badly across slots. 4 out of 32 slots get 4/5ths of the 
> data, another 3 slots get about 1/5th and a last slot just a few records. 
> This probably triggers disk spillover on these slots and slows down the job 
> immensely. The data has many many unique keys and processing could be done in 
> a highly parallel manner. From what I understand, HDFS data locality governs 
> which splits are assigned to which subtask.
> 
> I'm running a Beam on Flink on YARN pipeline.
> I'm reading 30-ish files, whose records are later grouped by their millions 
> of unique keys.
> For now, I have 8 task managers by 4 slots. Beam sets all subtasks to have 32 
> parallelism.
> Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task 
> managers.
> 
> Does the statement of input split assignment ring true? Is the fact that data 
> isn't redistributed an effort from Flink to have high data locality, even if 
> this means disk spillover for a few slots/tms and idleness for others? Is 
> there any use for parallelism if work isn't distributed anyway?
> 
> Thanks for your time, Reinier



Re: HDFS data locality and distribution, Flink

2018-03-12 Thread Reinier Kip
Relevant versions: Beam 2.1, Flink 1.3.


From: Reinier Kip <r...@bol.com>
Sent: 12 March 2018 13:46:24
To: user@beam.apache.org
Subject: HDFS data locality and distribution, Flink


Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that data is 
distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, 
another 3 slots get about 1/5th and a last slot just a few records. This 
probably triggers disk spillover on these slots and slows down the job 
immensely. The data has many many unique keys and processing could be done in a 
highly parallel manner. From what I understand, HDFS data locality governs 
which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their 
millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to 
have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task 
managers.


Does the statement of input split assignment ring true? Is the fact that data 
isn't redistributed an effort from Flink to have high data locality, even if 
this means disk spillover for a few slots/tms and idleness for others? Is there 
any use for parallelism if work isn't distributed anyway?


Thanks for your time, Reinier


HDFS data locality and distribution, Flink

2018-03-12 Thread Reinier Kip
Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that data is 
distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, 
another 3 slots get about 1/5th and a last slot just a few records. This 
probably triggers disk spillover on these slots and slows down the job 
immensely. The data has many many unique keys and processing could be done in a 
highly parallel manner. From what I understand, HDFS data locality governs 
which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their 
millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to 
have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task 
managers.


Does the statement of input split assignment ring true? Is the fact that data 
isn't redistributed an effort from Flink to have high data locality, even if 
this means disk spillover for a few slots/tms and idleness for others? Is there 
any use for parallelism if work isn't distributed anyway?


Thanks for your time, Reinier