The BigEndianLongCoder has the property that the byte values sort the same way as you they would sort naturally for non-negative numbers.
On Wed, Nov 23, 2016 at 1:06 PM, Mitch Shanklin <[email protected]> wrote: > The sorter extension might be helpful for your use case, though it is only > meant for partitioned sorting (or global sorting for smaller data sets). > https://github.com/apache/incubator-beam/tree/master/sdks/java/extensions/ > sorter > > It only supports local (not distributed) sorting. It will sort in memory > and spill to disk if the data cannot fit in memory, but if the data set > can't fit in the worker's disk it will fail. > > If I'm following your use case correctly and you want complete sorting on > the records, it would probably work to first extract the timestamp (in > milliseconds since epoch) from the record to make a PCollection of KV<Long, > Record> then do what Lukasz suggested to partition the elements to make a > PCollection of KV<Interval, Iterable<KV<Long, Record>>> at which point > you can use this extension directly. This will work as long as the time > intervals are fine-grained enough that the records associated with an > interval can fit on a single worker. > > The conversion of a timestamp to a long value would be necessary because > currently the extension only supports sorting lexicographically by the byte > encoding of the keys. It would also work with certain string > representations of a timestamp. > > Feel free to ping me with any questions about the sorter. > > Thanks, > Mitch > > On Tue, Nov 22, 2016 at 7:55 AM, Lukasz Cwik <[email protected]> wrote: > >> Since the only guarantee for a unit of atomicity is an individual >> element, you could group your records per time interval using a GBK thus >> producing KV<Interval, Iterable<Record>>. In a DoFn, for each KV<Interval, >> Iterable<Record>> you would write out the Records to a file based upon the >> interval key. >> >> On Tue, Nov 22, 2016 at 10:18 AM, Bergmann, Rico (GfK External) < >> [email protected]> wrote: >> >>> The requirement is to have a set of files with avro records written to >>> hdfs, where the avro records are sorted by a time field of this record. >>> >>> >>> >>> It would also suffice if I could partition the output with a custom >>> partition function (for example daywise)… >>> >>> >>> >>> Thanks, Rico. >>> >>> >>> >>> *Von:* Lukasz Cwik [mailto:[email protected]] >>> *Gesendet:* Dienstag, 22. November 2016 16:00 >>> *An:* [email protected] >>> *Betreff:* Re: Support for sorting output in Beam? >>> >>> >>> >>> There is not explicit support for sorting in the Beam model today >>> because the problem space is large and typically the usecases people have >>> generally suffice to do a global combine and sort in memory or do a combine >>> per key with a radix like scheme and sort each radix individually. >>> >>> >>> >>> Can you give more details about your usecase? >>> >>> Maybe you don't need to do any sorting or there is an alternative. >>> >>> >>> >>> >>> >>> >>> >>> On Tue, Nov 22, 2016 at 9:41 AM, Bergmann, Rico (GfK External) < >>> [email protected]> wrote: >>> >>> Hi! >>> >>> >>> >>> Looking at the Java API Doc I didn’t find anything for sorting. How >>> would I do this with Beam? >>> >>> >>> >>> Best, Rico. >>> >>> >>> >>> >>> ------------------------------ >>> >>> >>> >>> GfK SE, Nuremberg, Germany, commercial register at the local court >>> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard >>> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), >>> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the >>> Supervisory Board: Ralf Klein-Bölting This email and any attachments may >>> contain confidential or privileged information. Please note that >>> unauthorized copying, disclosure or distribution of the material in this >>> email is not permitted. >>> >>> >>> >>> ------------------------------ >>> >>> >>> GfK SE, Nuremberg, Germany, commercial register at the local court >>> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard >>> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), >>> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the >>> Supervisory Board: Ralf Klein-Bölting This email and any attachments may >>> contain confidential or privileged information. Please note that >>> unauthorized copying, disclosure or distribution of the material in this >>> email is not permitted. >>> >> >> >
