One could group it by a random integer up to N like Bradshaw suggests, and
then globally find a offset for each key such that it becomes dense.

The only non-distributed work is finding the offsets, which should over at
most N rows, which in turn should be somewhere in the magnitude of your
parallelism, so it should be limited.

Den tor. 21. feb. 2019 kl. 09.44 skrev Robert Bradshaw <[email protected]
>:

> Your intuition is correct that this will not scale well. Here
> distinctWords will need to be read in its entirety into memory, and in
> addition doing a lookup with indexOf for each word will be O(n^2)
> work.
>
> If it is sufficient to get indices that are mostly, but not perfectly,
> dense, I would recommend choosing a degree of parallelism N and doing
>
> indexedWords = distinctWords
>     .keyBy(w => randomIntegerUpTo(N))
>     .groupByKey()
>     .flatMap((k, ws) => [(k + N * i, w) for (w, i) <- ws.zipWithIndex])
>
> The larger N the better parallelism but the more potential for gaps.
> You could use a DoFn that does round-robin (starting at a random
> point) assignment rather than being purely random for the first key
> assignment to make things a little better as well.
>
> On Thu, Feb 21, 2019 at 7:11 AM Daniel Erenrich <[email protected]>
> wrote:
> >
> > Sure, I'm considering implementing distributed matrix factorization for
> collaborative filtering in beam as a project to teach myself the framework.
> One useful preprocessing step is to map the users and items to rows and
> columns of the ratings matrix. To make the matrix as dense as possible it
> is useful (though not strictly required) for those row and column numbers
> to be sequential integers.
> >
> > All that said, I'm very new to the framework and I'm not 100% sure that
> you can even naively express this algorithm in beam. Still the primitive I
> described seems generically useful when you want to build dense
> matrices/vectors.
> >
> > Daniel
> >
> > On Wed, Feb 20, 2019 at 9:49 PM Kenneth Knowles <[email protected]> wrote:
> >>
> >> Can you share more about the end-to-end use case?
> >>
> >> Kenn
> >>
> >> On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich <[email protected]>
> wrote:
> >>>
> >>> My workflow requires me to assign arbitrary unique sequential integers
> to every distinct element in a PCollection. In spark I would do this with
> .zipWithIndex but it seems that isn't possible in beam (there was a
> previous conversation about this on this mailing list). I was able to get
> something to work in beam/scio by doing the following
> >>>
> >>>     val wordsSide = distinctWords.asListSideInput
> >>>     val wordsWithWords = distinctWords.withSideInputs(wordsSide)
> >>>     val indexedWords = wordsWithWords.keyBy((x, s) =>
> s(wordsSide).indexOf(x))
> >>>
> >>> It works for my tests but my inexperience with beam makes me worried
> this will not work in practice. Things I'm worried about: what if
> distinctWords is an unbounded stream? will that invalidate all of my
> previous indices? how will this interact with windowing? Is there a more
> efficient way to do this?
> >>>
> >>> (And yes I know indexOf will be very slow and I should be using a
> better data-structure)
> >>>
> >>> Also, if I do get this to work would this be something a PR would be
> accepted for to add this functionality directly into beam/scio? I often
> need something like this.
> >>>
> >>> Thanks for any help,
> >>> Daniel
> >>>
>

Reply via email to