You can get uniqueness by inserting a group-by-key on a set of
arbitrary, and then using this key + position in the value list to
create a unique index, e.g.

indexed_pcollection = (
    input_pcollection
    | beam.Map(lambda x: (random.randrange(N), x))
    | beam.GroupByKey()
    | beam.FlatMap(lambda (k, xs): ((k+N*ix, x) for ix, x in enumerate(xs))))

This will be perfectly dense if N=1, but processed sequentially.
Increasing N will increase parallelism at the expense of lower density
(though still pretty dense most of the time).

Note that even if using stateful DoFns one would need to introduce
keys (and, implicitly, a group-by-key).

On Mon, Apr 10, 2017 at 1:36 PM, Ben Chambers <[email protected]> wrote:
> There isn't currently a great of doing this, since in general, it would
> require single-threaded processing. Further, PCollections don't really have
> a concept of order.
>
> Could you explain more about your use case? Why do you need to zip elements
> with their index?
>
>
> On Mon, Apr 10, 2017 at 1:28 PM Antony Mayi <[email protected]> wrote:
>>
>> Hi,
>>
>> trying to implement a transformer PTransform<PCollection<T>,
>> PCollection<KV<Long, T>>> where the output keys would be generated element
>> indexes (increasing, unique). What's the best way to implement this before
>> the beam stateful DoFn is available?
>>
>> thx,
>> a.

Reply via email to