We tried that Robert on flink and it’s pretty awful performance wise. We 
doubled the speed by using a new hadoopoutputformat which basically has N 
hadoopoutputformats in side it and the writeRecord looks at the enum and writes 
the record to the appropriate “inner” output format.

Most of the performance problems we’ve had so far are all forcing us to kind of 
hack the data flows in unnatural ways to avoid serialization costs.

For example, read a file and split in to two datasets based on a column 
predicate. Approach 1:

READ FILE -> Filter 1 -> Dataset<LiveRecords>

è Filter 2 -> Dataset<DeadRecords>

Is slower that:

READ FILE -> Filter 1 -> Dataset<LiveRecords>
READ FILE -> Filter 2 -> Dataset<DeadRecords>

Read the file twice is faster! Go figure.



From: Robert Bradshaw [mailto:[email protected]]
Sent: Tuesday, March 28, 2017 4:25 AM
To: [email protected]
Subject: Re: Beam partitioned file reading and writing

I would simply use a Partition transform to convert the 
PCollection<KV<Enum,GenericRecord>> data to a list of 
PCollection<GenericRecord> and then write each one as desired. Either that or a 
DoFn with multiple side outputs (one for each enum value). There's no reason 
this should perform poorly on the runners I am aware of, and is the most 
straightforward (and least error-prone) solution. (I don't know what your 
original solution looked like, but if you used N filters rather than one DoFn 
with N outputs that would perform much worse.)

https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23transforms-2Dflatten-2Dpartition&d=DgMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=qDkekTTM-oEjfwT1J8cG7-YDw82caRxagKcDy9XzObU&s=TrrcZV-faCmc-E2wszxt-nYbY-ZdcaQu7FfIfNNhj_I&e=>

On Tue, Mar 28, 2017 at 1:04 AM, Narek Amirbekian 
<[email protected]<mailto:[email protected]>> wrote:
We had to implement a custom sink in order to get partitioned destination. This 
is the least hacky solution, but the down-side of this approach is and it is a 
lot of code that custom sinks only work in batch mode. If you need to do this 
in streaming mode you may be able to get away with writing as a side-effect of 
a pardo, but you have to account for concurrent writes, and chunks might be 
processed more than once so you have to account for duplicate values in your 
output.

On Thu, Mar 23, 2017 at 9:46 AM Newport, Billy 
<[email protected]<mailto:[email protected]>> wrote:
Is there builtin support for writing partitioned Collections. For example:

PCollection<KV<Enum,GenericRecord>> data;

We want to write the GenericRecords in data in to different files based on the 
enum. We’ve did this in flink by making a proxy hadoopoutputformat which has 
the N real outputformats and the write method checks the enum and forwards the 
write call for the genericrecord to the correct outputformat.

Given the lack of beam parquet support, the final writer we want to use with 
beam is Avro.

We used the proxy outputformat trick in flink because performance was very poor 
using a filter to split it and then a map to convert from Enum,GenericRecord to 
just GenericRecord.

I’m nervous to use side outputs in beam given I think they will be implemented 
as described here which performs poorly.

So basically, has anyone implemented a demuxing AvroIO.Writer?

Thanks

Reply via email to