Does this code snippet represent what you are talking about, Mike?
val partitionedWords = distinctWords.keyBy(w => r.nextInt(N))
val partitionCounts = partitionedWords.countByKey.asMapSideInput
val out =
partitionedWords.groupByKey.withSideInputs(partitionCounts).flatMap((t, s)
=> {
var prefixCount = 0L
for (i <- 0 until t._1) prefixCount += s(partitionCounts)(i)
for ((w, i) <- t._2.zipWithIndex) yield (i + prefixCount, w)
})
I assume this still wouldn't work well with streaming distinctWords?
I'm guessing the arbitrary nature of the N param means this wouldn't be a
good candidate for inclusion in the standard library?
Daniel
On Thu, Feb 21, 2019 at 6:56 AM Mike Pedersen <[email protected]> wrote:
> One could group it by a random integer up to N like Bradshaw suggests, and
> then globally find a offset for each key such that it becomes dense.
>
> The only non-distributed work is finding the offsets, which should over at
> most N rows, which in turn should be somewhere in the magnitude of your
> parallelism, so it should be limited.
>
> Den tor. 21. feb. 2019 kl. 09.44 skrev Robert Bradshaw <
> [email protected]>:
>
>> Your intuition is correct that this will not scale well. Here
>> distinctWords will need to be read in its entirety into memory, and in
>> addition doing a lookup with indexOf for each word will be O(n^2)
>> work.
>>
>> If it is sufficient to get indices that are mostly, but not perfectly,
>> dense, I would recommend choosing a degree of parallelism N and doing
>>
>> indexedWords = distinctWords
>> .keyBy(w => randomIntegerUpTo(N))
>> .groupByKey()
>> .flatMap((k, ws) => [(k + N * i, w) for (w, i) <- ws.zipWithIndex])
>>
>> The larger N the better parallelism but the more potential for gaps.
>> You could use a DoFn that does round-robin (starting at a random
>> point) assignment rather than being purely random for the first key
>> assignment to make things a little better as well.
>>
>> On Thu, Feb 21, 2019 at 7:11 AM Daniel Erenrich <[email protected]>
>> wrote:
>> >
>> > Sure, I'm considering implementing distributed matrix factorization for
>> collaborative filtering in beam as a project to teach myself the framework.
>> One useful preprocessing step is to map the users and items to rows and
>> columns of the ratings matrix. To make the matrix as dense as possible it
>> is useful (though not strictly required) for those row and column numbers
>> to be sequential integers.
>> >
>> > All that said, I'm very new to the framework and I'm not 100% sure that
>> you can even naively express this algorithm in beam. Still the primitive I
>> described seems generically useful when you want to build dense
>> matrices/vectors.
>> >
>> > Daniel
>> >
>> > On Wed, Feb 20, 2019 at 9:49 PM Kenneth Knowles <[email protected]>
>> wrote:
>> >>
>> >> Can you share more about the end-to-end use case?
>> >>
>> >> Kenn
>> >>
>> >> On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich <[email protected]>
>> wrote:
>> >>>
>> >>> My workflow requires me to assign arbitrary unique sequential
>> integers to every distinct element in a PCollection. In spark I would do
>> this with .zipWithIndex but it seems that isn't possible in beam (there was
>> a previous conversation about this on this mailing list). I was able to get
>> something to work in beam/scio by doing the following
>> >>>
>> >>> val wordsSide = distinctWords.asListSideInput
>> >>> val wordsWithWords = distinctWords.withSideInputs(wordsSide)
>> >>> val indexedWords = wordsWithWords.keyBy((x, s) =>
>> s(wordsSide).indexOf(x))
>> >>>
>> >>> It works for my tests but my inexperience with beam makes me worried
>> this will not work in practice. Things I'm worried about: what if
>> distinctWords is an unbounded stream? will that invalidate all of my
>> previous indices? how will this interact with windowing? Is there a more
>> efficient way to do this?
>> >>>
>> >>> (And yes I know indexOf will be very slow and I should be using a
>> better data-structure)
>> >>>
>> >>> Also, if I do get this to work would this be something a PR would be
>> accepted for to add this functionality directly into beam/scio? I often
>> need something like this.
>> >>>
>> >>> Thanks for any help,
>> >>> Daniel
>> >>>
>>
>