On Thu, Feb 21, 2019 at 11:34 PM Daniel Erenrich <[email protected]> wrote:
>
> Does this code snippet represent what you are talking about, Mike?
>
>     val partitionedWords = distinctWords.keyBy(w => r.nextInt(N))
>     val partitionCounts = partitionedWords.countByKey.asMapSideInput
>     val out = 
> partitionedWords.groupByKey.withSideInputs(partitionCounts).flatMap((t, s) => 
>  {
>       var prefixCount = 0L
>       for (i <- 0 until t._1) prefixCount += s(partitionCounts)(i)
>     for ((w, i) <- t._2.zipWithIndex) yield (i + prefixCount, w)
>     })
>
> I assume this still wouldn't work well with streaming distinctWords?

For streaming one could use a stateful DoFn rather than a global GBK.

> I'm guessing the arbitrary nature of the N param means this wouldn't be a 
> good candidate for inclusion in the standard library?

N could be a parameter to this transform if included in the standard
library, possibly with a large default N (say 1000) that would work
well in most cases.

> On Thu, Feb 21, 2019 at 6:56 AM Mike Pedersen <[email protected]> wrote:
>>
>> One could group it by a random integer up to N like Bradshaw suggests, and 
>> then globally find a offset for each key such that it becomes dense.
>>
>> The only non-distributed work is finding the offsets, which should over at 
>> most N rows, which in turn should be somewhere in the magnitude of your 
>> parallelism, so it should be limited.
>>
>> Den tor. 21. feb. 2019 kl. 09.44 skrev Robert Bradshaw <[email protected]>:
>>>
>>> Your intuition is correct that this will not scale well. Here
>>> distinctWords will need to be read in its entirety into memory, and in
>>> addition doing a lookup with indexOf for each word will be O(n^2)
>>> work.
>>>
>>> If it is sufficient to get indices that are mostly, but not perfectly,
>>> dense, I would recommend choosing a degree of parallelism N and doing
>>>
>>> indexedWords = distinctWords
>>>     .keyBy(w => randomIntegerUpTo(N))
>>>     .groupByKey()
>>>     .flatMap((k, ws) => [(k + N * i, w) for (w, i) <- ws.zipWithIndex])
>>>
>>> The larger N the better parallelism but the more potential for gaps.
>>> You could use a DoFn that does round-robin (starting at a random
>>> point) assignment rather than being purely random for the first key
>>> assignment to make things a little better as well.
>>>
>>> On Thu, Feb 21, 2019 at 7:11 AM Daniel Erenrich <[email protected]> wrote:
>>> >
>>> > Sure, I'm considering implementing distributed matrix factorization for 
>>> > collaborative filtering in beam as a project to teach myself the 
>>> > framework. One useful preprocessing step is to map the users and items to 
>>> > rows and columns of the ratings matrix. To make the matrix as dense as 
>>> > possible it is useful (though not strictly required) for those row and 
>>> > column numbers to be sequential integers.
>>> >
>>> > All that said, I'm very new to the framework and I'm not 100% sure that 
>>> > you can even naively express this algorithm in beam. Still the primitive 
>>> > I described seems generically useful when you want to build dense 
>>> > matrices/vectors.
>>> >
>>> > Daniel
>>> >
>>> > On Wed, Feb 20, 2019 at 9:49 PM Kenneth Knowles <[email protected]> wrote:
>>> >>
>>> >> Can you share more about the end-to-end use case?
>>> >>
>>> >> Kenn
>>> >>
>>> >> On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich <[email protected]> 
>>> >> wrote:
>>> >>>
>>> >>> My workflow requires me to assign arbitrary unique sequential integers 
>>> >>> to every distinct element in a PCollection. In spark I would do this 
>>> >>> with .zipWithIndex but it seems that isn't possible in beam (there was 
>>> >>> a previous conversation about this on this mailing list). I was able to 
>>> >>> get something to work in beam/scio by doing the following
>>> >>>
>>> >>>     val wordsSide = distinctWords.asListSideInput
>>> >>>     val wordsWithWords = distinctWords.withSideInputs(wordsSide)
>>> >>>     val indexedWords = wordsWithWords.keyBy((x, s) => 
>>> >>> s(wordsSide).indexOf(x))
>>> >>>
>>> >>> It works for my tests but my inexperience with beam makes me worried 
>>> >>> this will not work in practice. Things I'm worried about: what if 
>>> >>> distinctWords is an unbounded stream? will that invalidate all of my 
>>> >>> previous indices? how will this interact with windowing? Is there a 
>>> >>> more efficient way to do this?
>>> >>>
>>> >>> (And yes I know indexOf will be very slow and I should be using a 
>>> >>> better data-structure)
>>> >>>
>>> >>> Also, if I do get this to work would this be something a PR would be 
>>> >>> accepted for to add this functionality directly into beam/scio? I often 
>>> >>> need something like this.
>>> >>>
>>> >>> Thanks for any help,
>>> >>> Daniel
>>> >>>

Reply via email to