Can you share more about the end-to-end use case?

Kenn

On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich <[email protected]> wrote:

> My workflow requires me to assign arbitrary unique sequential integers to
> every distinct element in a PCollection. In spark I would do this with
> .zipWithIndex but it seems that isn't possible in beam (there was a
> previous conversation about this on this mailing list). I was able to get
> something to work in beam/scio by doing the following
>
>     val wordsSide = distinctWords.asListSideInput
>     val wordsWithWords = distinctWords.withSideInputs(wordsSide)
>     val indexedWords = wordsWithWords.keyBy((x, s) =>
> s(wordsSide).indexOf(x))
>
> It works for my tests but my inexperience with beam makes me worried this
> will not work in practice. Things I'm worried about: what if distinctWords is
> an unbounded stream? will that invalidate all of my previous indices? how
> will this interact with windowing? Is there a more efficient way to do this?
>
> (And yes I know indexOf will be very slow and I should be using a better
> data-structure)
>
> Also, if I do get this to work would this be something a PR would be
> accepted for to add this functionality directly into beam/scio? I often
> need something like this.
>
> Thanks for any help,
> Daniel
>
>

Reply via email to