Hi Billy,

Thanks for that feedback -- truly invaluable.

Would you mind filing a bug in the Flink runner component in JIRA with this
information? There may be a simple fix :)

On Wed, Mar 29, 2017 at 8:06 AM, Newport, Billy <[email protected]>
wrote:

> We tried that Robert on flink and it’s pretty awful performance wise. We
> doubled the speed by using a new hadoopoutputformat which basically has N
> hadoopoutputformats in side it and the writeRecord looks at the enum and
> writes the record to the appropriate “inner” output format.
>
>
>
> Most of the performance problems we’ve had so far are all forcing us to
> kind of hack the data flows in unnatural ways to avoid serialization costs.
>
>
>
> For example, read a file and split in to two datasets based on a column
> predicate. Approach 1:
>
>
>
> READ FILE -> Filter 1 -> Dataset<LiveRecords>
>
> è Filter 2 -> Dataset<DeadRecords>
>
>
>
> Is slower that:
>
>
>
> READ FILE -> Filter 1 -> Dataset<LiveRecords>
>
> READ FILE -> Filter 2 -> Dataset<DeadRecords>
>
>
>
> Read the file twice is faster! Go figure.
>
>
>
>
>
>
>
> *From:* Robert Bradshaw [mailto:[email protected]]
> *Sent:* Tuesday, March 28, 2017 4:25 AM
> *To:* [email protected]
> *Subject:* Re: Beam partitioned file reading and writing
>
>
>
> I would simply use a Partition transform to convert the
> PCollection<KV<Enum,GenericRecord>> data to a list
> of PCollection<GenericRecord> and then write each one as desired. Either
> that or a DoFn with multiple side outputs (one for each enum value).
> There's no reason this should perform poorly on the runners I am aware of,
> and is the most straightforward (and least error-prone) solution. (I don't
> know what your original solution looked like, but if you used N filters
> rather than one DoFn with N outputs that would perform much worse.)
>
>
>
> https://beam.apache.org/documentation/programming-
> guide/#transforms-flatten-partition
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23transforms-2Dflatten-2Dpartition&d=DgMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=qDkekTTM-oEjfwT1J8cG7-YDw82caRxagKcDy9XzObU&s=TrrcZV-faCmc-E2wszxt-nYbY-ZdcaQu7FfIfNNhj_I&e=>
>
>
>
> On Tue, Mar 28, 2017 at 1:04 AM, Narek Amirbekian <[email protected]>
> wrote:
>
> We had to implement a custom sink in order to get partitioned destination.
> This is the least hacky solution, but the down-side of this approach is and
> it is a lot of code that custom sinks only work in batch mode. If you need
> to do this in streaming mode you may be able to get away with writing as a
> side-effect of a pardo, but you have to account for concurrent writes, and
> chunks might be processed more than once so you have to account for
> duplicate values in your output.
>
>
>
> On Thu, Mar 23, 2017 at 9:46 AM Newport, Billy <[email protected]>
> wrote:
>
> Is there builtin support for writing partitioned Collections. For example:
>
>
>
> PCollection<KV<Enum,GenericRecord>> data;
>
>
>
> We want to write the GenericRecords in data in to different files based on
> the enum. We’ve did this in flink by making a proxy hadoopoutputformat
> which has the N real outputformats and the write method checks the enum and
> forwards the write call for the genericrecord to the correct outputformat.
>
>
>
> Given the lack of beam parquet support, the final writer we want to use
> with beam is Avro.
>
>
>
> We used the proxy outputformat trick in flink because performance was very
> poor using a filter to split it and then a map to convert from
> Enum,GenericRecord to just GenericRecord.
>
>
>
> I’m nervous to use side outputs in beam given I think they will be
> implemented as described here which performs poorly.
>
>
>
> So basically, has anyone implemented a demuxing AvroIO.Writer?
>
>
>
> Thanks
>
>
>

Reply via email to