Hi Billy, Thanks for that feedback -- truly invaluable.
Would you mind filing a bug in the Flink runner component in JIRA with this information? There may be a simple fix :) On Wed, Mar 29, 2017 at 8:06 AM, Newport, Billy <[email protected]> wrote: > We tried that Robert on flink and it’s pretty awful performance wise. We > doubled the speed by using a new hadoopoutputformat which basically has N > hadoopoutputformats in side it and the writeRecord looks at the enum and > writes the record to the appropriate “inner” output format. > > > > Most of the performance problems we’ve had so far are all forcing us to > kind of hack the data flows in unnatural ways to avoid serialization costs. > > > > For example, read a file and split in to two datasets based on a column > predicate. Approach 1: > > > > READ FILE -> Filter 1 -> Dataset<LiveRecords> > > è Filter 2 -> Dataset<DeadRecords> > > > > Is slower that: > > > > READ FILE -> Filter 1 -> Dataset<LiveRecords> > > READ FILE -> Filter 2 -> Dataset<DeadRecords> > > > > Read the file twice is faster! Go figure. > > > > > > > > *From:* Robert Bradshaw [mailto:[email protected]] > *Sent:* Tuesday, March 28, 2017 4:25 AM > *To:* [email protected] > *Subject:* Re: Beam partitioned file reading and writing > > > > I would simply use a Partition transform to convert the > PCollection<KV<Enum,GenericRecord>> data to a list > of PCollection<GenericRecord> and then write each one as desired. Either > that or a DoFn with multiple side outputs (one for each enum value). > There's no reason this should perform poorly on the runners I am aware of, > and is the most straightforward (and least error-prone) solution. (I don't > know what your original solution looked like, but if you used N filters > rather than one DoFn with N outputs that would perform much worse.) > > > > https://beam.apache.org/documentation/programming- > guide/#transforms-flatten-partition > <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23transforms-2Dflatten-2Dpartition&d=DgMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=qDkekTTM-oEjfwT1J8cG7-YDw82caRxagKcDy9XzObU&s=TrrcZV-faCmc-E2wszxt-nYbY-ZdcaQu7FfIfNNhj_I&e=> > > > > On Tue, Mar 28, 2017 at 1:04 AM, Narek Amirbekian <[email protected]> > wrote: > > We had to implement a custom sink in order to get partitioned destination. > This is the least hacky solution, but the down-side of this approach is and > it is a lot of code that custom sinks only work in batch mode. If you need > to do this in streaming mode you may be able to get away with writing as a > side-effect of a pardo, but you have to account for concurrent writes, and > chunks might be processed more than once so you have to account for > duplicate values in your output. > > > > On Thu, Mar 23, 2017 at 9:46 AM Newport, Billy <[email protected]> > wrote: > > Is there builtin support for writing partitioned Collections. For example: > > > > PCollection<KV<Enum,GenericRecord>> data; > > > > We want to write the GenericRecords in data in to different files based on > the enum. We’ve did this in flink by making a proxy hadoopoutputformat > which has the N real outputformats and the write method checks the enum and > forwards the write call for the genericrecord to the correct outputformat. > > > > Given the lack of beam parquet support, the final writer we want to use > with beam is Avro. > > > > We used the proxy outputformat trick in flink because performance was very > poor using a filter to split it and then a map to convert from > Enum,GenericRecord to just GenericRecord. > > > > I’m nervous to use side outputs in beam given I think they will be > implemented as described here which performs poorly. > > > > So basically, has anyone implemented a demuxing AvroIO.Writer? > > > > Thanks > > >
