My workflow requires me to assign arbitrary unique sequential integers to
every distinct element in a PCollection. In spark I would do this with
.zipWithIndex but it seems that isn't possible in beam (there was a
previous conversation about this on this mailing list). I was able to get
something to work in beam/scio by doing the following

    val wordsSide = distinctWords.asListSideInput
    val wordsWithWords = distinctWords.withSideInputs(wordsSide)
    val indexedWords = wordsWithWords.keyBy((x, s) =>
s(wordsSide).indexOf(x))

It works for my tests but my inexperience with beam makes me worried this
will not work in practice. Things I'm worried about: what if distinctWords is
an unbounded stream? will that invalidate all of my previous indices? how
will this interact with windowing? Is there a more efficient way to do this?

(And yes I know indexOf will be very slow and I should be using a better
data-structure)

Also, if I do get this to work would this be something a PR would be
accepted for to add this functionality directly into beam/scio? I often
need something like this.

Thanks for any help,
Daniel

Reply via email to