The BigEndianLongCoder has the property that the byte values sort the same
way as you they would sort naturally for non-negative numbers.

On Wed, Nov 23, 2016 at 1:06 PM, Mitch Shanklin <[email protected]>
wrote:

> The sorter extension might be helpful for your use case, though it is only
> meant for partitioned sorting (or global sorting for smaller data sets).
> https://github.com/apache/incubator-beam/tree/master/sdks/java/extensions/
> sorter
>
> It only supports local (not distributed) sorting. It will sort in memory
> and spill to disk if the data cannot fit in memory, but if the data set
> can't fit in the worker's disk it will fail.
>
> If I'm following your use case correctly and you want complete sorting on
> the records, it would probably work to first extract the timestamp (in
> milliseconds since epoch) from the record to make a PCollection of KV<Long,
> Record> then do what Lukasz suggested to partition the elements to make a
> PCollection of KV<Interval, Iterable<KV<Long, Record>>> at which point
> you can use this extension directly. This will work as long as the time
> intervals are fine-grained enough that the records associated with an
> interval can fit on a single worker.
>
> The conversion of a timestamp to a long value would be necessary because
> currently the extension only supports sorting lexicographically by the byte
> encoding of the keys. It would also work with certain string
> representations of a timestamp.
>
> Feel free to ping me with any questions about the sorter.
>
> Thanks,
> Mitch
>
> On Tue, Nov 22, 2016 at 7:55 AM, Lukasz Cwik <[email protected]> wrote:
>
>> Since the only guarantee for a unit of atomicity is an individual
>> element, you could group your records per time interval using a GBK thus
>> producing KV<Interval, Iterable<Record>>. In a DoFn, for each KV<Interval,
>> Iterable<Record>> you would write out the Records to a file based upon the
>> interval key.
>>
>> On Tue, Nov 22, 2016 at 10:18 AM, Bergmann, Rico (GfK External) <
>> [email protected]> wrote:
>>
>>> The requirement is to have a set of files with avro records written to
>>> hdfs, where the avro records are sorted by a time field of this record.
>>>
>>>
>>>
>>> It would also suffice if I could partition the output with a custom
>>> partition function (for example daywise)…
>>>
>>>
>>>
>>> Thanks, Rico.
>>>
>>>
>>>
>>> *Von:* Lukasz Cwik [mailto:[email protected]]
>>> *Gesendet:* Dienstag, 22. November 2016 16:00
>>> *An:* [email protected]
>>> *Betreff:* Re: Support for sorting output in Beam?
>>>
>>>
>>>
>>> There is not explicit support for sorting in the Beam model today
>>> because the problem space is large and typically the usecases people have
>>> generally suffice to do a global combine and sort in memory or do a combine
>>> per key with a radix like scheme and sort each radix individually.
>>>
>>>
>>>
>>> Can you give more details about your usecase?
>>>
>>> Maybe you don't need to do any sorting or there is an alternative.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Nov 22, 2016 at 9:41 AM, Bergmann, Rico (GfK External) <
>>> [email protected]> wrote:
>>>
>>> Hi!
>>>
>>>
>>>
>>> Looking at the Java API Doc I didn’t find anything for sorting. How
>>> would I do this with Beam?
>>>
>>>
>>>
>>> Best, Rico.
>>>
>>>
>>>
>>>
>>> ------------------------------
>>>
>>>
>>>
>>> GfK SE, Nuremberg, Germany, commercial register at the local court
>>> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
>>> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
>>> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the
>>> Supervisory Board: Ralf Klein-Bölting This email and any attachments may
>>> contain confidential or privileged information. Please note that
>>> unauthorized copying, disclosure or distribution of the material in this
>>> email is not permitted.
>>>
>>>
>>>
>>> ------------------------------
>>>
>>>
>>> GfK SE, Nuremberg, Germany, commercial register at the local court
>>> Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard
>>> Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO),
>>> Matthias Hartmann, David Krajicek, Alessandra Cama; Chairman of the
>>> Supervisory Board: Ralf Klein-Bölting This email and any attachments may
>>> contain confidential or privileged information. Please note that
>>> unauthorized copying, disclosure or distribution of the material in this
>>> email is not permitted.
>>>
>>
>>
>

Reply via email to