On Thu, Feb 21, 2019 at 11:34 PM Daniel Erenrich <[email protected]> wrote: > > Does this code snippet represent what you are talking about, Mike? > > val partitionedWords = distinctWords.keyBy(w => r.nextInt(N)) > val partitionCounts = partitionedWords.countByKey.asMapSideInput > val out = > partitionedWords.groupByKey.withSideInputs(partitionCounts).flatMap((t, s) => > { > var prefixCount = 0L > for (i <- 0 until t._1) prefixCount += s(partitionCounts)(i) > for ((w, i) <- t._2.zipWithIndex) yield (i + prefixCount, w) > }) > > I assume this still wouldn't work well with streaming distinctWords?
For streaming one could use a stateful DoFn rather than a global GBK. > I'm guessing the arbitrary nature of the N param means this wouldn't be a > good candidate for inclusion in the standard library? N could be a parameter to this transform if included in the standard library, possibly with a large default N (say 1000) that would work well in most cases. > On Thu, Feb 21, 2019 at 6:56 AM Mike Pedersen <[email protected]> wrote: >> >> One could group it by a random integer up to N like Bradshaw suggests, and >> then globally find a offset for each key such that it becomes dense. >> >> The only non-distributed work is finding the offsets, which should over at >> most N rows, which in turn should be somewhere in the magnitude of your >> parallelism, so it should be limited. >> >> Den tor. 21. feb. 2019 kl. 09.44 skrev Robert Bradshaw <[email protected]>: >>> >>> Your intuition is correct that this will not scale well. Here >>> distinctWords will need to be read in its entirety into memory, and in >>> addition doing a lookup with indexOf for each word will be O(n^2) >>> work. >>> >>> If it is sufficient to get indices that are mostly, but not perfectly, >>> dense, I would recommend choosing a degree of parallelism N and doing >>> >>> indexedWords = distinctWords >>> .keyBy(w => randomIntegerUpTo(N)) >>> .groupByKey() >>> .flatMap((k, ws) => [(k + N * i, w) for (w, i) <- ws.zipWithIndex]) >>> >>> The larger N the better parallelism but the more potential for gaps. >>> You could use a DoFn that does round-robin (starting at a random >>> point) assignment rather than being purely random for the first key >>> assignment to make things a little better as well. >>> >>> On Thu, Feb 21, 2019 at 7:11 AM Daniel Erenrich <[email protected]> wrote: >>> > >>> > Sure, I'm considering implementing distributed matrix factorization for >>> > collaborative filtering in beam as a project to teach myself the >>> > framework. One useful preprocessing step is to map the users and items to >>> > rows and columns of the ratings matrix. To make the matrix as dense as >>> > possible it is useful (though not strictly required) for those row and >>> > column numbers to be sequential integers. >>> > >>> > All that said, I'm very new to the framework and I'm not 100% sure that >>> > you can even naively express this algorithm in beam. Still the primitive >>> > I described seems generically useful when you want to build dense >>> > matrices/vectors. >>> > >>> > Daniel >>> > >>> > On Wed, Feb 20, 2019 at 9:49 PM Kenneth Knowles <[email protected]> wrote: >>> >> >>> >> Can you share more about the end-to-end use case? >>> >> >>> >> Kenn >>> >> >>> >> On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich <[email protected]> >>> >> wrote: >>> >>> >>> >>> My workflow requires me to assign arbitrary unique sequential integers >>> >>> to every distinct element in a PCollection. In spark I would do this >>> >>> with .zipWithIndex but it seems that isn't possible in beam (there was >>> >>> a previous conversation about this on this mailing list). I was able to >>> >>> get something to work in beam/scio by doing the following >>> >>> >>> >>> val wordsSide = distinctWords.asListSideInput >>> >>> val wordsWithWords = distinctWords.withSideInputs(wordsSide) >>> >>> val indexedWords = wordsWithWords.keyBy((x, s) => >>> >>> s(wordsSide).indexOf(x)) >>> >>> >>> >>> It works for my tests but my inexperience with beam makes me worried >>> >>> this will not work in practice. Things I'm worried about: what if >>> >>> distinctWords is an unbounded stream? will that invalidate all of my >>> >>> previous indices? how will this interact with windowing? Is there a >>> >>> more efficient way to do this? >>> >>> >>> >>> (And yes I know indexOf will be very slow and I should be using a >>> >>> better data-structure) >>> >>> >>> >>> Also, if I do get this to work would this be something a PR would be >>> >>> accepted for to add this functionality directly into beam/scio? I often >>> >>> need something like this. >>> >>> >>> >>> Thanks for any help, >>> >>> Daniel >>> >>>
