The sorter extension might be helpful for your use case, though it is only meant for partitioned sorting (or global sorting for smaller data sets). https://github.com/apache/incubator-beam/tree/master/ sdks/java/extensions/sorter
It only supports local (not distributed) sorting. It will sort in memory and spill to disk if the data cannot fit in memory, but if the data set can't fit in the worker's disk it will fail. If I'm following your use case correctly and you want complete sorting on the records, it would probably work to first extract the timestamp (in milliseconds since epoch) from the record to make a PCollection of KV<Long, Record> then do what Lukasz suggested to partition the elements to make a PCollection of KV<Interval, Iterable<KV<Long, Record>>> at which point you can use this extension directly. This will work as long as the time intervals are fine-grained enough that the records associated with an interval can fit on a single worker. The conversion of a timestamp to a long value would be necessary because currently the extension only supports sorting lexicographically by the byte encoding of the keys. It would also work with certain string representations of a timestamp. Feel free to ping me with any questions about the sorter. Thanks, Mitch On Tue, Nov 22, 2016 at 7:55 AM, Lukasz Cwik <[email protected]> wrote: > Since the only guarantee for a unit of atomicity is an individual element, > you could group your records per time interval using a GBK thus producing > KV<Interval, Iterable<Record>>. In a DoFn, for each KV<Interval, > Iterable<Record>> you would write out the Records to a file based upon the > interval key. > > On Tue, Nov 22, 2016 at 10:18 AM, Bergmann, Rico (GfK External) < > [email protected]> wrote: > >> The requirement is to have a set of files with avro records written to >> hdfs, where the avro records are sorted by a time field of this record. >> >> >> >> It would also suffice if I could partition the output with a custom >> partition function (for example daywise)… >> >> >> >> Thanks, Rico. >> >> >> >> *Von:* Lukasz Cwik [mailto:[email protected]] >> *Gesendet:* Dienstag, 22. November 2016 16:00 >> *An:* [email protected] >> *Betreff:* Re: Support for sorting output in Beam? >> >> >> >> There is not explicit support for sorting in the Beam model today because >> the problem space is large and typically the usecases people have generally >> suffice to do a global combine and sort in memory or do a combine per key >> with a radix like scheme and sort each radix individually. >> >> >> >> Can you give more details about your usecase? >> >> Maybe you don't need to do any sorting or there is an alternative. >> >> >> >> >> >> >> >> On Tue, Nov 22, 2016 at 9:41 AM, Bergmann, Rico (GfK External) < >> [email protected]> wrote: >> >> Hi! >> >> >> >> Looking at the Java API Doc I didn’t find anything for sorting. How would >> I do this with Beam? >> >> >> >> Best, Rico. >> >> >> >> >> ------------------------------ >> >> >> >> GfK SE, Nuremberg, Germany, commercial register at the local court >> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard >> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), >> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the >> Supervisory Board: Ralf Klein-Bölting This email and any attachments may >> contain confidential or privileged information. Please note that >> unauthorized copying, disclosure or distribution of the material in this >> email is not permitted. >> >> >> >> ------------------------------ >> >> >> GfK SE, Nuremberg, Germany, commercial register at the local court >> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard >> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), >> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the >> Supervisory Board: Ralf Klein-Bölting This email and any attachments may >> contain confidential or privileged information. Please note that >> unauthorized copying, disclosure or distribution of the material in this >> email is not permitted. >> > >
