Your intuition is correct that this will not scale well. Here
distinctWords will need to be read in its entirety into memory, and in
addition doing a lookup with indexOf for each word will be O(n^2)
work.

If it is sufficient to get indices that are mostly, but not perfectly,
dense, I would recommend choosing a degree of parallelism N and doing

indexedWords = distinctWords
    .keyBy(w => randomIntegerUpTo(N))
    .groupByKey()
    .flatMap((k, ws) => [(k + N * i, w) for (w, i) <- ws.zipWithIndex])

The larger N the better parallelism but the more potential for gaps.
You could use a DoFn that does round-robin (starting at a random
point) assignment rather than being purely random for the first key
assignment to make things a little better as well.

On Thu, Feb 21, 2019 at 7:11 AM Daniel Erenrich <[email protected]> wrote:
>
> Sure, I'm considering implementing distributed matrix factorization for 
> collaborative filtering in beam as a project to teach myself the framework. 
> One useful preprocessing step is to map the users and items to rows and 
> columns of the ratings matrix. To make the matrix as dense as possible it is 
> useful (though not strictly required) for those row and column numbers to be 
> sequential integers.
>
> All that said, I'm very new to the framework and I'm not 100% sure that you 
> can even naively express this algorithm in beam. Still the primitive I 
> described seems generically useful when you want to build dense 
> matrices/vectors.
>
> Daniel
>
> On Wed, Feb 20, 2019 at 9:49 PM Kenneth Knowles <[email protected]> wrote:
>>
>> Can you share more about the end-to-end use case?
>>
>> Kenn
>>
>> On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich <[email protected]> wrote:
>>>
>>> My workflow requires me to assign arbitrary unique sequential integers to 
>>> every distinct element in a PCollection. In spark I would do this with 
>>> .zipWithIndex but it seems that isn't possible in beam (there was a 
>>> previous conversation about this on this mailing list). I was able to get 
>>> something to work in beam/scio by doing the following
>>>
>>>     val wordsSide = distinctWords.asListSideInput
>>>     val wordsWithWords = distinctWords.withSideInputs(wordsSide)
>>>     val indexedWords = wordsWithWords.keyBy((x, s) => 
>>> s(wordsSide).indexOf(x))
>>>
>>> It works for my tests but my inexperience with beam makes me worried this 
>>> will not work in practice. Things I'm worried about: what if distinctWords 
>>> is an unbounded stream? will that invalidate all of my previous indices? 
>>> how will this interact with windowing? Is there a more efficient way to do 
>>> this?
>>>
>>> (And yes I know indexOf will be very slow and I should be using a better 
>>> data-structure)
>>>
>>> Also, if I do get this to work would this be something a PR would be 
>>> accepted for to add this functionality directly into beam/scio? I often 
>>> need something like this.
>>>
>>> Thanks for any help,
>>> Daniel
>>>

Reply via email to