Re: [DISCUSS] Performance of write() in file based IO

2018-08-23 Thread Reuven Lax
Tim, thanks for digging into this! There are some complexities fixing the
bug (i.e. Beam currently allows the temp directory to be different than the
target directory), but let's continue discussion on that JIRA.

Reuven

On Thu, Aug 23, 2018 at 6:05 AM Tim Robertson 
wrote:

> Thanks for linking this discussion with BEAM-5036 (and transitively to
> BEAM-4861 which also comes in to play) Jozek.
>
> What Reuven speculated and Jozek had previously observed is indeed the
> major cause. Today I've been testing the effect of a "move" using rename()
> instead of a copy() and delete().
>
> My test environment is different today but still using 1.5TB input data
> and the code I linked earlier in GH [1]:
>
>   - Spark API: 35 minutes
>   - Beam AvroIO (2.6.0): 1.7hrs
>   - Beam AvroIO with rename() patch: 42 minutes
>
> On the DAG linked in the GH repo [1] stages 3&4 are reduced to seconds
> saving 53 minutes from Beam 2.6.0 version which is the predominant gain
> here.
>
> Unless new comments come in I propose fixing BEAM-5036 and BEAM-4861 and
> continuing discussion on those Jiras.
> This requires a bit of exploration and decision around the expectations of
> e.g. the target directory not existing and also correcting the incorrect
> use of the HDFS API (it ignores the return value which can indicate error
> on e.g. directory not existing today).
>
> Thank you all for contributing to this discussion.
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>
>
>
> On Thu, Aug 23, 2018 at 11:55 AM Jozef Vilcek 
> wrote:
>
>> Just for reference, there is a JIRA open for
>> FileBasedSink.moveToOutputFiles()  and filesystem move behavior
>>
>> https://issues.apache.org/jira/browse/BEAM-5036
>>
>>
>> On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson 
>> wrote:
>>
>>> Reuven, I think you might be on to something
>>>
>>> The Beam HadoopFileSystem copy() does indeed stream through the driver
>>> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method
>>> [2].
>>> I'll cobble together a patched version to test using a rename() rather
>>> than a copy() and report back findings before we consider the implications.
>>>
>>> Thanks
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288
>>>
>>> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson 
>>> wrote:
>>>
 > Does HDFS support a fast rename operation?

 Yes. From the shell it is “mv” and in the Java API it is “rename(Path
 src, Path dst)”.
 I am not aware of a fast copy though. I think an HDFS copy streams the
 bytes through the driver (unless a distcp is issued which is a MR job).

 (Thanks for engaging in this discussion folks)


 On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax  wrote:

> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
> temporary files to the final destination and then delete the temp files.
> Does HDFS support a fast rename operation? If so, I bet Spark is using 
> that
> instead of paying the cost of copying the files.
>
> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax  wrote:
>
>> Ismael, that should already be true. If not using dynamic
>> destinations there might be some edges in the graph that are never used
>> (i.e. no records are ever published on them), but that should not affect
>> performance. If this is not the case we should fix it.
>>
>> Reuven
>>
>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía 
>> wrote:
>>
>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>> side input PCollections in the workers, not sure exactly if this is
>>> efficient assigned in an optimal way but seems logical at least.
>>>
>>> Just wondering if we shouldn't better first tackle the fact that if
>>> the pipeline does not have dynamic destinations (this case)
>>> WriteFiles
>>> should not be doing so much extra magic?
>>>
>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
>>> >
>>> > Often only the metadata (i.e. temp file names) are shuffled,
>>> except in the "spilling" case (which should only happen when using 
>>> dynamic
>>> destinations).
>>> >
>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>> implemented in the Spark runner?
>>> >
>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>> >>
>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>> >> primitive window-based naming we used to have.
>>> >>
>>> >> It would be interesting to visualize how much of this codepath is
>>> >> metatada vs. the actual data.
>>> >>
>>> >> In the case of file writing, it seems one cou

Re: [DISCUSS] Performance of write() in file based IO

2018-08-23 Thread Tim Robertson
Thanks for linking this discussion with BEAM-5036 (and transitively to
BEAM-4861 which also comes in to play) Jozek.

What Reuven speculated and Jozek had previously observed is indeed the
major cause. Today I've been testing the effect of a "move" using rename()
instead of a copy() and delete().

My test environment is different today but still using 1.5TB input data and
the code I linked earlier in GH [1]:

  - Spark API: 35 minutes
  - Beam AvroIO (2.6.0): 1.7hrs
  - Beam AvroIO with rename() patch: 42 minutes

On the DAG linked in the GH repo [1] stages 3&4 are reduced to seconds
saving 53 minutes from Beam 2.6.0 version which is the predominant gain
here.

Unless new comments come in I propose fixing BEAM-5036 and BEAM-4861 and
continuing discussion on those Jiras.
This requires a bit of exploration and decision around the expectations of
e.g. the target directory not existing and also correcting the incorrect
use of the HDFS API (it ignores the return value which can indicate error
on e.g. directory not existing today).

Thank you all for contributing to this discussion.

[1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro



On Thu, Aug 23, 2018 at 11:55 AM Jozef Vilcek  wrote:

> Just for reference, there is a JIRA open for
> FileBasedSink.moveToOutputFiles()  and filesystem move behavior
>
> https://issues.apache.org/jira/browse/BEAM-5036
>
>
> On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson 
> wrote:
>
>> Reuven, I think you might be on to something
>>
>> The Beam HadoopFileSystem copy() does indeed stream through the driver
>> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method
>> [2].
>> I'll cobble together a patched version to test using a rename() rather
>> than a copy() and report back findings before we consider the implications.
>>
>> Thanks
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288
>>
>> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson 
>> wrote:
>>
>>> > Does HDFS support a fast rename operation?
>>>
>>> Yes. From the shell it is “mv” and in the Java API it is “rename(Path
>>> src, Path dst)”.
>>> I am not aware of a fast copy though. I think an HDFS copy streams the
>>> bytes through the driver (unless a distcp is issued which is a MR job).
>>>
>>> (Thanks for engaging in this discussion folks)
>>>
>>>
>>> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax  wrote:
>>>
 I have another theory: in FileBasedSink.moveToOutputFiles we copy the
 temporary files to the final destination and then delete the temp files.
 Does HDFS support a fast rename operation? If so, I bet Spark is using that
 instead of paying the cost of copying the files.

 On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax  wrote:

> Ismael, that should already be true. If not using dynamic destinations
> there might be some edges in the graph that are never used (i.e. no 
> records
> are ever published on them), but that should not affect performance. If
> this is not the case we should fix it.
>
> Reuven
>
> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía 
> wrote:
>
>> Spark runner uses the Spark broadcast mechanism to materialize the
>> side input PCollections in the workers, not sure exactly if this is
>> efficient assigned in an optimal way but seems logical at least.
>>
>> Just wondering if we shouldn't better first tackle the fact that if
>> the pipeline does not have dynamic destinations (this case) WriteFiles
>> should not be doing so much extra magic?
>>
>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
>> >
>> > Often only the metadata (i.e. temp file names) are shuffled, except
>> in the "spilling" case (which should only happen when using dynamic
>> destinations).
>> >
>> > WriteFiles depends heavily on side inputs. How are side inputs
>> implemented in the Spark runner?
>> >
>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <
>> rober...@google.com> wrote:
>> >>
>> >> Yes, I stand corrected, dynamic writes is now much more than the
>> >> primitive window-based naming we used to have.
>> >>
>> >> It would be interesting to visualize how much of this codepath is
>> >> metatada vs. the actual data.
>> >>
>> >> In the case of file writing, it seems one could (maybe?) avoid
>> >> requiring a stable input, as shards are accepted as a whole
>> (unlike,
>> >> say, sinks where a deterministic uid is needed for deduplication on
>> >> retry).
>> >>
>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax 
>> wrote:
>> >> >
>> >> > Robert - much of the complexity isn't due to streaming, but
>> rather because WriteFiles supports "dynamic" output (where the us

Re: [DISCUSS] Performance of write() in file based IO

2018-08-23 Thread Jozef Vilcek
Just for reference, there is a JIRA open for
FileBasedSink.moveToOutputFiles()  and filesystem move behavior

https://issues.apache.org/jira/browse/BEAM-5036


On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson 
wrote:

> Reuven, I think you might be on to something
>
> The Beam HadoopFileSystem copy() does indeed stream through the driver
> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method
> [2].
> I'll cobble together a patched version to test using a rename() rather
> than a copy() and report back findings before we consider the implications.
>
> Thanks
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288
>
> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson 
> wrote:
>
>> > Does HDFS support a fast rename operation?
>>
>> Yes. From the shell it is “mv” and in the Java API it is “rename(Path
>> src, Path dst)”.
>> I am not aware of a fast copy though. I think an HDFS copy streams the
>> bytes through the driver (unless a distcp is issued which is a MR job).
>>
>> (Thanks for engaging in this discussion folks)
>>
>>
>> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax  wrote:
>>
>>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>>> temporary files to the final destination and then delete the temp files.
>>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>>> instead of paying the cost of copying the files.
>>>
>>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax  wrote:
>>>
 Ismael, that should already be true. If not using dynamic destinations
 there might be some edges in the graph that are never used (i.e. no records
 are ever published on them), but that should not affect performance. If
 this is not the case we should fix it.

 Reuven

 On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía  wrote:

> Spark runner uses the Spark broadcast mechanism to materialize the
> side input PCollections in the workers, not sure exactly if this is
> efficient assigned in an optimal way but seems logical at least.
>
> Just wondering if we shouldn't better first tackle the fact that if
> the pipeline does not have dynamic destinations (this case) WriteFiles
> should not be doing so much extra magic?
>
> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
> >
> > Often only the metadata (i.e. temp file names) are shuffled, except
> in the "spilling" case (which should only happen when using dynamic
> destinations).
> >
> > WriteFiles depends heavily on side inputs. How are side inputs
> implemented in the Spark runner?
> >
> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw 
> wrote:
> >>
> >> Yes, I stand corrected, dynamic writes is now much more than the
> >> primitive window-based naming we used to have.
> >>
> >> It would be interesting to visualize how much of this codepath is
> >> metatada vs. the actual data.
> >>
> >> In the case of file writing, it seems one could (maybe?) avoid
> >> requiring a stable input, as shards are accepted as a whole (unlike,
> >> say, sinks where a deterministic uid is needed for deduplication on
> >> retry).
> >>
> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax 
> wrote:
> >> >
> >> > Robert - much of the complexity isn't due to streaming, but
> rather because WriteFiles supports "dynamic" output (where the user can
> choose a destination file based on the input record). In practice if a
> pipeline is not using dynamic destinations the full graph is still
> generated, but much of that graph is never used (empty PCollections).
> >> >
> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>
> >> >> I agree that this is concerning. Some of the complexity may have
> also
> >> >> been introduced to accommodate writing files in Streaming mode,
> but it
> >> >> seems we should be able to execute this as a single Map
> operation.
> >> >>
> >> >> Have you profiled to see which stages and/or operations are
> taking up the time?
> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >> >>  wrote:
> >> >> >
> >> >> > Hi folks,
> >> >> >
> >> >> > I've recently been involved in projects rewriting Avro files
> and have discovered a concerning performance trait in Beam.
> >> >> >
> >> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >> >
> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >> >  - Rewriting 1.5TB Avro file (small 

Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Tim Robertson
Reuven, I think you might be on to something

The Beam HadoopFileSystem copy() does indeed stream through the driver [1],
and the FileBasedSink.moveToOutputFiles() seemingly uses that method [2].
I'll cobble together a patched version to test using a rename() rather than
a copy() and report back findings before we consider the implications.

Thanks

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288

On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson 
wrote:

> > Does HDFS support a fast rename operation?
>
> Yes. From the shell it is “mv” and in the Java API it is “rename(Path src,
> Path dst)”.
> I am not aware of a fast copy though. I think an HDFS copy streams the
> bytes through the driver (unless a distcp is issued which is a MR job).
>
> (Thanks for engaging in this discussion folks)
>
>
> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax  wrote:
>
>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>> temporary files to the final destination and then delete the temp files.
>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>> instead of paying the cost of copying the files.
>>
>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax  wrote:
>>
>>> Ismael, that should already be true. If not using dynamic destinations
>>> there might be some edges in the graph that are never used (i.e. no records
>>> are ever published on them), but that should not affect performance. If
>>> this is not the case we should fix it.
>>>
>>> Reuven
>>>
>>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía  wrote:
>>>
 Spark runner uses the Spark broadcast mechanism to materialize the
 side input PCollections in the workers, not sure exactly if this is
 efficient assigned in an optimal way but seems logical at least.

 Just wondering if we shouldn't better first tackle the fact that if
 the pipeline does not have dynamic destinations (this case) WriteFiles
 should not be doing so much extra magic?

 On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
 >
 > Often only the metadata (i.e. temp file names) are shuffled, except
 in the "spilling" case (which should only happen when using dynamic
 destinations).
 >
 > WriteFiles depends heavily on side inputs. How are side inputs
 implemented in the Spark runner?
 >
 > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw 
 wrote:
 >>
 >> Yes, I stand corrected, dynamic writes is now much more than the
 >> primitive window-based naming we used to have.
 >>
 >> It would be interesting to visualize how much of this codepath is
 >> metatada vs. the actual data.
 >>
 >> In the case of file writing, it seems one could (maybe?) avoid
 >> requiring a stable input, as shards are accepted as a whole (unlike,
 >> say, sinks where a deterministic uid is needed for deduplication on
 >> retry).
 >>
 >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
 >> >
 >> > Robert - much of the complexity isn't due to streaming, but rather
 because WriteFiles supports "dynamic" output (where the user can choose a
 destination file based on the input record). In practice if a pipeline is
 not using dynamic destinations the full graph is still generated, but much
 of that graph is never used (empty PCollections).
 >> >
 >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
 rober...@google.com> wrote:
 >> >>
 >> >> I agree that this is concerning. Some of the complexity may have
 also
 >> >> been introduced to accommodate writing files in Streaming mode,
 but it
 >> >> seems we should be able to execute this as a single Map operation.
 >> >>
 >> >> Have you profiled to see which stages and/or operations are
 taking up the time?
 >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
 >> >>  wrote:
 >> >> >
 >> >> > Hi folks,
 >> >> >
 >> >> > I've recently been involved in projects rewriting Avro files
 and have discovered a concerning performance trait in Beam.
 >> >> >
 >> >> > I have observed Beam between 6-20x slower than native Spark or
 MapReduce code for a simple pipeline of read Avro, modify, write Avro.
 >> >> >
 >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
 Beam/Spark, 40 minutes with a map-only MR job
 >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
 Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
 >> >> >
 >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
 (Spark / YARN) on reference Dell / Cloudera hardware.
 >> >> >
 >> >> > I have only just started exploring but I believe the cause is
 rooted i

Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Tim Robertson
> Does HDFS support a fast rename operation?

Yes. From the shell it is “mv” and in the Java API it is “rename(Path src,
Path dst)”.
I am not aware of a fast copy though. I think an HDFS copy streams the
bytes through the driver (unless a distcp is issued which is a MR job).

(Thanks for engaging in this discussion folks)


On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax  wrote:

> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
> temporary files to the final destination and then delete the temp files.
> Does HDFS support a fast rename operation? If so, I bet Spark is using that
> instead of paying the cost of copying the files.
>
> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax  wrote:
>
>> Ismael, that should already be true. If not using dynamic destinations
>> there might be some edges in the graph that are never used (i.e. no records
>> are ever published on them), but that should not affect performance. If
>> this is not the case we should fix it.
>>
>> Reuven
>>
>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía  wrote:
>>
>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>> side input PCollections in the workers, not sure exactly if this is
>>> efficient assigned in an optimal way but seems logical at least.
>>>
>>> Just wondering if we shouldn't better first tackle the fact that if
>>> the pipeline does not have dynamic destinations (this case) WriteFiles
>>> should not be doing so much extra magic?
>>>
>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
>>> >
>>> > Often only the metadata (i.e. temp file names) are shuffled, except in
>>> the "spilling" case (which should only happen when using dynamic
>>> destinations).
>>> >
>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>> implemented in the Spark runner?
>>> >
>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw 
>>> wrote:
>>> >>
>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>> >> primitive window-based naming we used to have.
>>> >>
>>> >> It would be interesting to visualize how much of this codepath is
>>> >> metatada vs. the actual data.
>>> >>
>>> >> In the case of file writing, it seems one could (maybe?) avoid
>>> >> requiring a stable input, as shards are accepted as a whole (unlike,
>>> >> say, sinks where a deterministic uid is needed for deduplication on
>>> >> retry).
>>> >>
>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
>>> >> >
>>> >> > Robert - much of the complexity isn't due to streaming, but rather
>>> because WriteFiles supports "dynamic" output (where the user can choose a
>>> destination file based on the input record). In practice if a pipeline is
>>> not using dynamic destinations the full graph is still generated, but much
>>> of that graph is never used (empty PCollections).
>>> >> >
>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>> >> >>
>>> >> >> I agree that this is concerning. Some of the complexity may have
>>> also
>>> >> >> been introduced to accommodate writing files in Streaming mode,
>>> but it
>>> >> >> seems we should be able to execute this as a single Map operation.
>>> >> >>
>>> >> >> Have you profiled to see which stages and/or operations are taking
>>> up the time?
>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>> >> >>  wrote:
>>> >> >> >
>>> >> >> > Hi folks,
>>> >> >> >
>>> >> >> > I've recently been involved in projects rewriting Avro files and
>>> have discovered a concerning performance trait in Beam.
>>> >> >> >
>>> >> >> > I have observed Beam between 6-20x slower than native Spark or
>>> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>>> >> >> >
>>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>>> Beam/Spark, 40 minutes with a map-only MR job
>>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>>> >> >> >
>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
>>> (Spark / YARN) on reference Dell / Cloudera hardware.
>>> >> >> >
>>> >> >> > I have only just started exploring but I believe the cause is
>>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
>>> is reasonably complex with reshuffles, spilling to temporary files
>>> (presumably to accommodate varying bundle sizes/avoid small files), a
>>> union, a GBK etc.
>>> >> >> >
>>> >> >> > Before I go too far with exploration I'd appreciate thoughts on
>>> whether we believe this is a concern (I do), if we should explore
>>> optimisations or any insight from previous work in this area.
>>> >> >> >
>>> >> >> > Thanks,
>>> >> >> > Tim
>>> >> >> >
>>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>>
>>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Reuven Lax
I have another theory: in FileBasedSink.moveToOutputFiles we copy the
temporary files to the final destination and then delete the temp files.
Does HDFS support a fast rename operation? If so, I bet Spark is using that
instead of paying the cost of copying the files.

On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax  wrote:

> Ismael, that should already be true. If not using dynamic destinations
> there might be some edges in the graph that are never used (i.e. no records
> are ever published on them), but that should not affect performance. If
> this is not the case we should fix it.
>
> Reuven
>
> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía  wrote:
>
>> Spark runner uses the Spark broadcast mechanism to materialize the
>> side input PCollections in the workers, not sure exactly if this is
>> efficient assigned in an optimal way but seems logical at least.
>>
>> Just wondering if we shouldn't better first tackle the fact that if
>> the pipeline does not have dynamic destinations (this case) WriteFiles
>> should not be doing so much extra magic?
>>
>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
>> >
>> > Often only the metadata (i.e. temp file names) are shuffled, except in
>> the "spilling" case (which should only happen when using dynamic
>> destinations).
>> >
>> > WriteFiles depends heavily on side inputs. How are side inputs
>> implemented in the Spark runner?
>> >
>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw 
>> wrote:
>> >>
>> >> Yes, I stand corrected, dynamic writes is now much more than the
>> >> primitive window-based naming we used to have.
>> >>
>> >> It would be interesting to visualize how much of this codepath is
>> >> metatada vs. the actual data.
>> >>
>> >> In the case of file writing, it seems one could (maybe?) avoid
>> >> requiring a stable input, as shards are accepted as a whole (unlike,
>> >> say, sinks where a deterministic uid is needed for deduplication on
>> >> retry).
>> >>
>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
>> >> >
>> >> > Robert - much of the complexity isn't due to streaming, but rather
>> because WriteFiles supports "dynamic" output (where the user can choose a
>> destination file based on the input record). In practice if a pipeline is
>> not using dynamic destinations the full graph is still generated, but much
>> of that graph is never used (empty PCollections).
>> >> >
>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw 
>> wrote:
>> >> >>
>> >> >> I agree that this is concerning. Some of the complexity may have
>> also
>> >> >> been introduced to accommodate writing files in Streaming mode, but
>> it
>> >> >> seems we should be able to execute this as a single Map operation.
>> >> >>
>> >> >> Have you profiled to see which stages and/or operations are taking
>> up the time?
>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>> >> >>  wrote:
>> >> >> >
>> >> >> > Hi folks,
>> >> >> >
>> >> >> > I've recently been involved in projects rewriting Avro files and
>> have discovered a concerning performance trait in Beam.
>> >> >> >
>> >> >> > I have observed Beam between 6-20x slower than native Spark or
>> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>> >> >> >
>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>> Beam/Spark, 40 minutes with a map-only MR job
>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>> >> >> >
>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
>> (Spark / YARN) on reference Dell / Cloudera hardware.
>> >> >> >
>> >> >> > I have only just started exploring but I believe the cause is
>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
>> is reasonably complex with reshuffles, spilling to temporary files
>> (presumably to accommodate varying bundle sizes/avoid small files), a
>> union, a GBK etc.
>> >> >> >
>> >> >> > Before I go too far with exploration I'd appreciate thoughts on
>> whether we believe this is a concern (I do), if we should explore
>> optimisations or any insight from previous work in this area.
>> >> >> >
>> >> >> > Thanks,
>> >> >> > Tim
>> >> >> >
>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Reuven Lax
Ismael, that should already be true. If not using dynamic destinations
there might be some edges in the graph that are never used (i.e. no records
are ever published on them), but that should not affect performance. If
this is not the case we should fix it.

Reuven

On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía  wrote:

> Spark runner uses the Spark broadcast mechanism to materialize the
> side input PCollections in the workers, not sure exactly if this is
> efficient assigned in an optimal way but seems logical at least.
>
> Just wondering if we shouldn't better first tackle the fact that if
> the pipeline does not have dynamic destinations (this case) WriteFiles
> should not be doing so much extra magic?
>
> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
> >
> > Often only the metadata (i.e. temp file names) are shuffled, except in
> the "spilling" case (which should only happen when using dynamic
> destinations).
> >
> > WriteFiles depends heavily on side inputs. How are side inputs
> implemented in the Spark runner?
> >
> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw 
> wrote:
> >>
> >> Yes, I stand corrected, dynamic writes is now much more than the
> >> primitive window-based naming we used to have.
> >>
> >> It would be interesting to visualize how much of this codepath is
> >> metatada vs. the actual data.
> >>
> >> In the case of file writing, it seems one could (maybe?) avoid
> >> requiring a stable input, as shards are accepted as a whole (unlike,
> >> say, sinks where a deterministic uid is needed for deduplication on
> >> retry).
> >>
> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
> >> >
> >> > Robert - much of the complexity isn't due to streaming, but rather
> because WriteFiles supports "dynamic" output (where the user can choose a
> destination file based on the input record). In practice if a pipeline is
> not using dynamic destinations the full graph is still generated, but much
> of that graph is never used (empty PCollections).
> >> >
> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw 
> wrote:
> >> >>
> >> >> I agree that this is concerning. Some of the complexity may have also
> >> >> been introduced to accommodate writing files in Streaming mode, but
> it
> >> >> seems we should be able to execute this as a single Map operation.
> >> >>
> >> >> Have you profiled to see which stages and/or operations are taking
> up the time?
> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >> >>  wrote:
> >> >> >
> >> >> > Hi folks,
> >> >> >
> >> >> > I've recently been involved in projects rewriting Avro files and
> have discovered a concerning performance trait in Beam.
> >> >> >
> >> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >> >
> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
> >> >> >
> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
> (Spark / YARN) on reference Dell / Cloudera hardware.
> >> >> >
> >> >> > I have only just started exploring but I believe the cause is
> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
> is reasonably complex with reshuffles, spilling to temporary files
> (presumably to accommodate varying bundle sizes/avoid small files), a
> union, a GBK etc.
> >> >> >
> >> >> > Before I go too far with exploration I'd appreciate thoughts on
> whether we believe this is a concern (I do), if we should explore
> optimisations or any insight from previous work in this area.
> >> >> >
> >> >> > Thanks,
> >> >> > Tim
> >> >> >
> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Ismaël Mejía
Spark runner uses the Spark broadcast mechanism to materialize the
side input PCollections in the workers, not sure exactly if this is
efficient assigned in an optimal way but seems logical at least.

Just wondering if we shouldn't better first tackle the fact that if
the pipeline does not have dynamic destinations (this case) WriteFiles
should not be doing so much extra magic?

On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
>
> Often only the metadata (i.e. temp file names) are shuffled, except in the 
> "spilling" case (which should only happen when using dynamic destinations).
>
> WriteFiles depends heavily on side inputs. How are side inputs implemented in 
> the Spark runner?
>
> On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw  wrote:
>>
>> Yes, I stand corrected, dynamic writes is now much more than the
>> primitive window-based naming we used to have.
>>
>> It would be interesting to visualize how much of this codepath is
>> metatada vs. the actual data.
>>
>> In the case of file writing, it seems one could (maybe?) avoid
>> requiring a stable input, as shards are accepted as a whole (unlike,
>> say, sinks where a deterministic uid is needed for deduplication on
>> retry).
>>
>> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
>> >
>> > Robert - much of the complexity isn't due to streaming, but rather because 
>> > WriteFiles supports "dynamic" output (where the user can choose a 
>> > destination file based on the input record). In practice if a pipeline is 
>> > not using dynamic destinations the full graph is still generated, but much 
>> > of that graph is never used (empty PCollections).
>> >
>> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw  
>> > wrote:
>> >>
>> >> I agree that this is concerning. Some of the complexity may have also
>> >> been introduced to accommodate writing files in Streaming mode, but it
>> >> seems we should be able to execute this as a single Map operation.
>> >>
>> >> Have you profiled to see which stages and/or operations are taking up the 
>> >> time?
>> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>> >>  wrote:
>> >> >
>> >> > Hi folks,
>> >> >
>> >> > I've recently been involved in projects rewriting Avro files and have 
>> >> > discovered a concerning performance trait in Beam.
>> >> >
>> >> > I have observed Beam between 6-20x slower than native Spark or 
>> >> > MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>> >> >
>> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using 
>> >> > Beam/Spark, 40 minutes with a map-only MR job
>> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 
>> >> > 18 minutes using vanilla Spark code. Test code available [1]
>> >> >
>> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark 
>> >> > / YARN) on reference Dell / Cloudera hardware.
>> >> >
>> >> > I have only just started exploring but I believe the cause is rooted in 
>> >> > the WriteFiles which is used by all our file based IO. WriteFiles is 
>> >> > reasonably complex with reshuffles, spilling to temporary files 
>> >> > (presumably to accommodate varying bundle sizes/avoid small files), a 
>> >> > union, a GBK etc.
>> >> >
>> >> > Before I go too far with exploration I'd appreciate thoughts on whether 
>> >> > we believe this is a concern (I do), if we should explore optimisations 
>> >> > or any insight from previous work in this area.
>> >> >
>> >> > Thanks,
>> >> > Tim
>> >> >
>> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Reuven Lax
Often only the metadata (i.e. temp file names) are shuffled, except in the
"spilling" case (which should only happen when using dynamic destinations).

WriteFiles depends heavily on side inputs. How are side inputs implemented
in the Spark runner?

On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw  wrote:

> Yes, I stand corrected, dynamic writes is now much more than the
> primitive window-based naming we used to have.
>
> It would be interesting to visualize how much of this codepath is
> metatada vs. the actual data.
>
> In the case of file writing, it seems one could (maybe?) avoid
> requiring a stable input, as shards are accepted as a whole (unlike,
> say, sinks where a deterministic uid is needed for deduplication on
> retry).
>
> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
> >
> > Robert - much of the complexity isn't due to streaming, but rather
> because WriteFiles supports "dynamic" output (where the user can choose a
> destination file based on the input record). In practice if a pipeline is
> not using dynamic destinations the full graph is still generated, but much
> of that graph is never used (empty PCollections).
> >
> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw 
> wrote:
> >>
> >> I agree that this is concerning. Some of the complexity may have also
> >> been introduced to accommodate writing files in Streaming mode, but it
> >> seems we should be able to execute this as a single Map operation.
> >>
> >> Have you profiled to see which stages and/or operations are taking up
> the time?
> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >>  wrote:
> >> >
> >> > Hi folks,
> >> >
> >> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >> >
> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >
> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark,
> 18 minutes using vanilla Spark code. Test code available [1]
> >> >
> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
> (Spark / YARN) on reference Dell / Cloudera hardware.
> >> >
> >> > I have only just started exploring but I believe the cause is rooted
> in the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >> >
> >> > Before I go too far with exploration I'd appreciate thoughts on
> whether we believe this is a concern (I do), if we should explore
> optimisations or any insight from previous work in this area.
> >> >
> >> > Thanks,
> >> > Tim
> >> >
> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Robert Bradshaw
Yes, I stand corrected, dynamic writes is now much more than the
primitive window-based naming we used to have.

It would be interesting to visualize how much of this codepath is
metatada vs. the actual data.

In the case of file writing, it seems one could (maybe?) avoid
requiring a stable input, as shards are accepted as a whole (unlike,
say, sinks where a deterministic uid is needed for deduplication on
retry).

On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
>
> Robert - much of the complexity isn't due to streaming, but rather because 
> WriteFiles supports "dynamic" output (where the user can choose a destination 
> file based on the input record). In practice if a pipeline is not using 
> dynamic destinations the full graph is still generated, but much of that 
> graph is never used (empty PCollections).
>
> On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw  wrote:
>>
>> I agree that this is concerning. Some of the complexity may have also
>> been introduced to accommodate writing files in Streaming mode, but it
>> seems we should be able to execute this as a single Map operation.
>>
>> Have you profiled to see which stages and/or operations are taking up the 
>> time?
>> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>  wrote:
>> >
>> > Hi folks,
>> >
>> > I've recently been involved in projects rewriting Avro files and have 
>> > discovered a concerning performance trait in Beam.
>> >
>> > I have observed Beam between 6-20x slower than native Spark or MapReduce 
>> > code for a simple pipeline of read Avro, modify, write Avro.
>> >
>> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 
>> > 40 minutes with a map-only MR job
>> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 
>> > minutes using vanilla Spark code. Test code available [1]
>> >
>> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / 
>> > YARN) on reference Dell / Cloudera hardware.
>> >
>> > I have only just started exploring but I believe the cause is rooted in 
>> > the WriteFiles which is used by all our file based IO. WriteFiles is 
>> > reasonably complex with reshuffles, spilling to temporary files 
>> > (presumably to accommodate varying bundle sizes/avoid small files), a 
>> > union, a GBK etc.
>> >
>> > Before I go too far with exploration I'd appreciate thoughts on whether we 
>> > believe this is a concern (I do), if we should explore optimisations or 
>> > any insight from previous work in this area.
>> >
>> > Thanks,
>> > Tim
>> >
>> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Reuven Lax
Robert - much of the complexity isn't due to streaming, but rather because
WriteFiles supports "dynamic" output (where the user can choose a
destination file based on the input record). In practice if a pipeline is
not using dynamic destinations the full graph is still generated, but much
of that graph is never used (empty PCollections).

On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw  wrote:

> I agree that this is concerning. Some of the complexity may have also
> been introduced to accommodate writing files in Streaming mode, but it
> seems we should be able to execute this as a single Map operation.
>
> Have you profiled to see which stages and/or operations are taking up the
> time?
> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>  wrote:
> >
> > Hi folks,
> >
> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >
> > I have observed Beam between 6-20x slower than native Spark or MapReduce
> code for a simple pipeline of read Avro, modify, write Avro.
> >
> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark,
> 40 minutes with a map-only MR job
> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
> minutes using vanilla Spark code. Test code available [1]
> >
> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
> YARN) on reference Dell / Cloudera hardware.
> >
> > I have only just started exploring but I believe the cause is rooted in
> the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >
> > Before I go too far with exploration I'd appreciate thoughts on whether
> we believe this is a concern (I do), if we should explore optimisations or
> any insight from previous work in this area.
> >
> > Thanks,
> > Tim
> >
> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Reuven Lax
I think we need dig in more to understand where the slowness is. Some
context (which might not be obvious from the code):

* Much of the complexity in WriteFiles is not always active. e.g. a lot of
it is there to support dynamic output (where the filename is dynamically
chosen based on the input record), and if you're not using dynamic output a
lot of those codepaths will not be used.

* There is some overhead because Beam does not assume that ParDos are
deterministic (by contrast, Spark often assumes that user code is
deterministic), and so inserts a shuffle to make sure that file writes are
deterministic. I believe that the current Spark runner might checkpoint the
entire RDD in such a case, which is quite inefficient. We should try on
other runners to make sure that this issue is not specific to the Spark
runner.

* Spilling to temporary files is done to avoid workers crashing with out of
memory. Beam attempts to write files straight out of the bundle (to avoid
shuffling all the data and just shuffle filenames). However empirically
when there are too many files we get large bundles and all the file write
buffers cause the workers to start running out of memory; a solution is to
reshuffle the data to distribute it. This will only happen if you are using
windowed writes or dynamic destinations to write to dynamic locations,
otherwise the spilled code path is never executed.

On Wed, Aug 22, 2018 at 2:29 AM Tim Robertson 
wrote:

> Hi folks,
>
> I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
>
> I have observed Beam between 6-20x slower than native Spark or MapReduce
> code for a simple pipeline of read Avro, modify, write Avro.
>
>  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark,
> 40 minutes with a map-only MR job
>  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
> minutes using vanilla Spark code. Test code available [1]
>
> These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
> YARN) on reference Dell / Cloudera hardware.
>
> I have only just started exploring but I believe the cause is rooted in
> the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
>
> Before I go too far with exploration I'd appreciate thoughts on whether we
> believe this is a concern (I do), if we should explore optimisations or any
> insight from previous work in this area.
>
> Thanks,
> Tim
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Tim Robertson
> Are we seeing similar discrepancies for Flink?

I am not sure I'm afraid (no easy access to flink right now). I tried
without success to get Apex runner going on Cloudera YARN for this today -
I'll keep trying when time allows.

I've updated the DAGs to show more detail:
https://github.com/gbif/beam-perf/tree/master/avro-to-avro

On Wed, Aug 22, 2018 at 1:41 PM Robert Bradshaw  wrote:

> That is quite the DAG... Are we seeing similar discrepancies for
> Flink? (Trying to understand if this is Beam->Spark translation bloat,
> or inherent to the WriteFiles transform itself.)
> On Wed, Aug 22, 2018 at 1:35 PM Tim Robertson 
> wrote:
> >
> > Thanks Robert
> >
> > > Have you profiled to see which stages and/or operations are taking up
> the time?
> >
> > Not yet. I'm browsing through the spark DAG produced which I've
> committed [1] and reading the code.
> >
> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
> >
> > On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw 
> wrote:
> >>
> >> I agree that this is concerning. Some of the complexity may have also
> >> been introduced to accommodate writing files in Streaming mode, but it
> >> seems we should be able to execute this as a single Map operation.
> >>
> >> Have you profiled to see which stages and/or operations are taking up
> the time?
> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >>  wrote:
> >> >
> >> > Hi folks,
> >> >
> >> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >> >
> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >
> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark,
> 18 minutes using vanilla Spark code. Test code available [1]
> >> >
> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
> (Spark / YARN) on reference Dell / Cloudera hardware.
> >> >
> >> > I have only just started exploring but I believe the cause is rooted
> in the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >> >
> >> > Before I go too far with exploration I'd appreciate thoughts on
> whether we believe this is a concern (I do), if we should explore
> optimisations or any insight from previous work in this area.
> >> >
> >> > Thanks,
> >> > Tim
> >> >
> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Robert Bradshaw
That is quite the DAG... Are we seeing similar discrepancies for
Flink? (Trying to understand if this is Beam->Spark translation bloat,
or inherent to the WriteFiles transform itself.)
On Wed, Aug 22, 2018 at 1:35 PM Tim Robertson  wrote:
>
> Thanks Robert
>
> > Have you profiled to see which stages and/or operations are taking up the 
> > time?
>
> Not yet. I'm browsing through the spark DAG produced which I've committed [1] 
> and reading the code.
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>
> On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw  wrote:
>>
>> I agree that this is concerning. Some of the complexity may have also
>> been introduced to accommodate writing files in Streaming mode, but it
>> seems we should be able to execute this as a single Map operation.
>>
>> Have you profiled to see which stages and/or operations are taking up the 
>> time?
>> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>  wrote:
>> >
>> > Hi folks,
>> >
>> > I've recently been involved in projects rewriting Avro files and have 
>> > discovered a concerning performance trait in Beam.
>> >
>> > I have observed Beam between 6-20x slower than native Spark or MapReduce 
>> > code for a simple pipeline of read Avro, modify, write Avro.
>> >
>> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 
>> > 40 minutes with a map-only MR job
>> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 
>> > minutes using vanilla Spark code. Test code available [1]
>> >
>> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / 
>> > YARN) on reference Dell / Cloudera hardware.
>> >
>> > I have only just started exploring but I believe the cause is rooted in 
>> > the WriteFiles which is used by all our file based IO. WriteFiles is 
>> > reasonably complex with reshuffles, spilling to temporary files 
>> > (presumably to accommodate varying bundle sizes/avoid small files), a 
>> > union, a GBK etc.
>> >
>> > Before I go too far with exploration I'd appreciate thoughts on whether we 
>> > believe this is a concern (I do), if we should explore optimisations or 
>> > any insight from previous work in this area.
>> >
>> > Thanks,
>> > Tim
>> >
>> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Tim Robertson
Thanks Robert

> Have you profiled to see which stages and/or operations are taking up the
time?

Not yet. I'm browsing through the spark DAG produced which I've committed
[1] and reading the code.

[1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro

On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw 
wrote:

> I agree that this is concerning. Some of the complexity may have also
> been introduced to accommodate writing files in Streaming mode, but it
> seems we should be able to execute this as a single Map operation.
>
> Have you profiled to see which stages and/or operations are taking up the
> time?
> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>  wrote:
> >
> > Hi folks,
> >
> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >
> > I have observed Beam between 6-20x slower than native Spark or MapReduce
> code for a simple pipeline of read Avro, modify, write Avro.
> >
> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark,
> 40 minutes with a map-only MR job
> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
> minutes using vanilla Spark code. Test code available [1]
> >
> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
> YARN) on reference Dell / Cloudera hardware.
> >
> > I have only just started exploring but I believe the cause is rooted in
> the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >
> > Before I go too far with exploration I'd appreciate thoughts on whether
> we believe this is a concern (I do), if we should explore optimisations or
> any insight from previous work in this area.
> >
> > Thanks,
> > Tim
> >
> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Robert Bradshaw
I agree that this is concerning. Some of the complexity may have also
been introduced to accommodate writing files in Streaming mode, but it
seems we should be able to execute this as a single Map operation.

Have you profiled to see which stages and/or operations are taking up the time?
On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
 wrote:
>
> Hi folks,
>
> I've recently been involved in projects rewriting Avro files and have 
> discovered a concerning performance trait in Beam.
>
> I have observed Beam between 6-20x slower than native Spark or MapReduce code 
> for a simple pipeline of read Avro, modify, write Avro.
>
>  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40 
> minutes with a map-only MR job
>  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 
> minutes using vanilla Spark code. Test code available [1]
>
> These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / 
> YARN) on reference Dell / Cloudera hardware.
>
> I have only just started exploring but I believe the cause is rooted in the 
> WriteFiles which is used by all our file based IO. WriteFiles is reasonably 
> complex with reshuffles, spilling to temporary files (presumably to 
> accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
>
> Before I go too far with exploration I'd appreciate thoughts on whether we 
> believe this is a concern (I do), if we should explore optimisations or any 
> insight from previous work in this area.
>
> Thanks,
> Tim
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro


[DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Tim Robertson
Hi folks,

I've recently been involved in projects rewriting Avro files and have
discovered a concerning performance trait in Beam.

I have observed Beam between 6-20x slower than native Spark or MapReduce
code for a simple pipeline of read Avro, modify, write Avro.

 - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40
minutes with a map-only MR job
 - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
minutes using vanilla Spark code. Test code available [1]

These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
YARN) on reference Dell / Cloudera hardware.

I have only just started exploring but I believe the cause is rooted in the
WriteFiles which is used by all our file based IO. WriteFiles is reasonably
complex with reshuffles, spilling to temporary files (presumably to
accommodate varying bundle sizes/avoid small files), a union, a GBK etc.

Before I go too far with exploration I'd appreciate thoughts on whether we
believe this is a concern (I do), if we should explore optimisations or any
insight from previous work in this area.

Thanks,
Tim

[1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro