One could group it by a random integer up to N like Bradshaw suggests, and then globally find a offset for each key such that it becomes dense.
The only non-distributed work is finding the offsets, which should over at most N rows, which in turn should be somewhere in the magnitude of your parallelism, so it should be limited. Den tor. 21. feb. 2019 kl. 09.44 skrev Robert Bradshaw <[email protected] >: > Your intuition is correct that this will not scale well. Here > distinctWords will need to be read in its entirety into memory, and in > addition doing a lookup with indexOf for each word will be O(n^2) > work. > > If it is sufficient to get indices that are mostly, but not perfectly, > dense, I would recommend choosing a degree of parallelism N and doing > > indexedWords = distinctWords > .keyBy(w => randomIntegerUpTo(N)) > .groupByKey() > .flatMap((k, ws) => [(k + N * i, w) for (w, i) <- ws.zipWithIndex]) > > The larger N the better parallelism but the more potential for gaps. > You could use a DoFn that does round-robin (starting at a random > point) assignment rather than being purely random for the first key > assignment to make things a little better as well. > > On Thu, Feb 21, 2019 at 7:11 AM Daniel Erenrich <[email protected]> > wrote: > > > > Sure, I'm considering implementing distributed matrix factorization for > collaborative filtering in beam as a project to teach myself the framework. > One useful preprocessing step is to map the users and items to rows and > columns of the ratings matrix. To make the matrix as dense as possible it > is useful (though not strictly required) for those row and column numbers > to be sequential integers. > > > > All that said, I'm very new to the framework and I'm not 100% sure that > you can even naively express this algorithm in beam. Still the primitive I > described seems generically useful when you want to build dense > matrices/vectors. > > > > Daniel > > > > On Wed, Feb 20, 2019 at 9:49 PM Kenneth Knowles <[email protected]> wrote: > >> > >> Can you share more about the end-to-end use case? > >> > >> Kenn > >> > >> On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich <[email protected]> > wrote: > >>> > >>> My workflow requires me to assign arbitrary unique sequential integers > to every distinct element in a PCollection. In spark I would do this with > .zipWithIndex but it seems that isn't possible in beam (there was a > previous conversation about this on this mailing list). I was able to get > something to work in beam/scio by doing the following > >>> > >>> val wordsSide = distinctWords.asListSideInput > >>> val wordsWithWords = distinctWords.withSideInputs(wordsSide) > >>> val indexedWords = wordsWithWords.keyBy((x, s) => > s(wordsSide).indexOf(x)) > >>> > >>> It works for my tests but my inexperience with beam makes me worried > this will not work in practice. Things I'm worried about: what if > distinctWords is an unbounded stream? will that invalidate all of my > previous indices? how will this interact with windowing? Is there a more > efficient way to do this? > >>> > >>> (And yes I know indexOf will be very slow and I should be using a > better data-structure) > >>> > >>> Also, if I do get this to work would this be something a PR would be > accepted for to add this functionality directly into beam/scio? I often > need something like this. > >>> > >>> Thanks for any help, > >>> Daniel > >>> >
