We tried that Robert on flink and it’s pretty awful performance wise. We doubled the speed by using a new hadoopoutputformat which basically has N hadoopoutputformats in side it and the writeRecord looks at the enum and writes the record to the appropriate “inner” output format.
Most of the performance problems we’ve had so far are all forcing us to kind of hack the data flows in unnatural ways to avoid serialization costs. For example, read a file and split in to two datasets based on a column predicate. Approach 1: READ FILE -> Filter 1 -> Dataset<LiveRecords> è Filter 2 -> Dataset<DeadRecords> Is slower that: READ FILE -> Filter 1 -> Dataset<LiveRecords> READ FILE -> Filter 2 -> Dataset<DeadRecords> Read the file twice is faster! Go figure. From: Robert Bradshaw [mailto:[email protected]] Sent: Tuesday, March 28, 2017 4:25 AM To: [email protected] Subject: Re: Beam partitioned file reading and writing I would simply use a Partition transform to convert the PCollection<KV<Enum,GenericRecord>> data to a list of PCollection<GenericRecord> and then write each one as desired. Either that or a DoFn with multiple side outputs (one for each enum value). There's no reason this should perform poorly on the runners I am aware of, and is the most straightforward (and least error-prone) solution. (I don't know what your original solution looked like, but if you used N filters rather than one DoFn with N outputs that would perform much worse.) https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23transforms-2Dflatten-2Dpartition&d=DgMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=qDkekTTM-oEjfwT1J8cG7-YDw82caRxagKcDy9XzObU&s=TrrcZV-faCmc-E2wszxt-nYbY-ZdcaQu7FfIfNNhj_I&e=> On Tue, Mar 28, 2017 at 1:04 AM, Narek Amirbekian <[email protected]<mailto:[email protected]>> wrote: We had to implement a custom sink in order to get partitioned destination. This is the least hacky solution, but the down-side of this approach is and it is a lot of code that custom sinks only work in batch mode. If you need to do this in streaming mode you may be able to get away with writing as a side-effect of a pardo, but you have to account for concurrent writes, and chunks might be processed more than once so you have to account for duplicate values in your output. On Thu, Mar 23, 2017 at 9:46 AM Newport, Billy <[email protected]<mailto:[email protected]>> wrote: Is there builtin support for writing partitioned Collections. For example: PCollection<KV<Enum,GenericRecord>> data; We want to write the GenericRecords in data in to different files based on the enum. We’ve did this in flink by making a proxy hadoopoutputformat which has the N real outputformats and the write method checks the enum and forwards the write call for the genericrecord to the correct outputformat. Given the lack of beam parquet support, the final writer we want to use with beam is Avro. We used the proxy outputformat trick in flink because performance was very poor using a filter to split it and then a map to convert from Enum,GenericRecord to just GenericRecord. I’m nervous to use side outputs in beam given I think they will be implemented as described here which performs poorly. So basically, has anyone implemented a demuxing AvroIO.Writer? Thanks
