The sorter extension might be helpful for your use case, though it is only
meant for partitioned sorting (or global sorting for smaller data sets).
https://github.com/apache/incubator-beam/tree/master/
sdks/java/extensions/sorter

It only supports local (not distributed) sorting. It will sort in memory
and spill to disk if the data cannot fit in memory, but if the data set
can't fit in the worker's disk it will fail.

If I'm following your use case correctly and you want complete sorting on
the records, it would probably work to first extract the timestamp (in
milliseconds since epoch) from the record to make a PCollection of KV<Long,
Record> then do what Lukasz suggested to partition the elements to make a
PCollection of KV<Interval, Iterable<KV<Long, Record>>> at which point you
can use this extension directly. This will work as long as the time
intervals are fine-grained enough that the records associated with an
interval can fit on a single worker.

The conversion of a timestamp to a long value would be necessary because
currently the extension only supports sorting lexicographically by the byte
encoding of the keys. It would also work with certain string
representations of a timestamp.

Feel free to ping me with any questions about the sorter.

Thanks,
Mitch

On Tue, Nov 22, 2016 at 7:55 AM, Lukasz Cwik <[email protected]> wrote:

> Since the only guarantee for a unit of atomicity is an individual element,
> you could group your records per time interval using a GBK thus producing
> KV<Interval, Iterable<Record>>. In a DoFn, for each KV<Interval,
> Iterable<Record>> you would write out the Records to a file based upon the
> interval key.
>
> On Tue, Nov 22, 2016 at 10:18 AM, Bergmann, Rico (GfK External) <
> [email protected]> wrote:
>
>> The requirement is to have a set of files with avro records written to
>> hdfs, where the avro records are sorted by a time field of this record.
>>
>>
>>
>> It would also suffice if I could partition the output with a custom
>> partition function (for example daywise)…
>>
>>
>>
>> Thanks, Rico.
>>
>>
>>
>> *Von:* Lukasz Cwik [mailto:[email protected]]
>> *Gesendet:* Dienstag, 22. November 2016 16:00
>> *An:* [email protected]
>> *Betreff:* Re: Support for sorting output in Beam?
>>
>>
>>
>> There is not explicit support for sorting in the Beam model today because
>> the problem space is large and typically the usecases people have generally
>> suffice to do a global combine and sort in memory or do a combine per key
>> with a radix like scheme and sort each radix individually.
>>
>>
>>
>> Can you give more details about your usecase?
>>
>> Maybe you don't need to do any sorting or there is an alternative.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Nov 22, 2016 at 9:41 AM, Bergmann, Rico (GfK External) <
>> [email protected]> wrote:
>>
>> Hi!
>>
>>
>>
>> Looking at the Java API Doc I didn’t find anything for sorting. How would
>> I do this with Beam?
>>
>>
>>
>> Best, Rico.
>>
>>
>>
>>
>> ------------------------------
>>
>>
>>
>> GfK SE, Nuremberg, Germany, commercial register at the local court
>> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
>> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
>> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the
>> Supervisory Board: Ralf Klein-Bölting This email and any attachments may
>> contain confidential or privileged information. Please note that
>> unauthorized copying, disclosure or distribution of the material in this
>> email is not permitted.
>>
>>
>>
>> ------------------------------
>>
>>
>> GfK SE, Nuremberg, Germany, commercial register at the local court
>> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
>> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
>> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the
>> Supervisory Board: Ralf Klein-Bölting This email and any attachments may
>> contain confidential or privileged information. Please note that
>> unauthorized copying, disclosure or distribution of the material in this
>> email is not permitted.
>>
>
>

Reply via email to