Are we over-thinking the problem here? Since the per-window compute task is
hugely expensive, stateless from window to window, and the original big
matrix is just 1GB, the primary gain in using a parallel engine is in
distributing and scheduling these (long-running, isolated) tasks. I'm
reading that data loading and distribution are going to be a tiny fraction
of the overall compute time.

If that's the case, it would make sense simply to start with a 1GB
Array[Double] on the driver, from that create an RDD comprising 20,000 rows
of 5,000 doubles each, map them out to the workers and have them interpret
what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each
have a good fraction of several days to figure it out :)

This would be a great load test for Spark's resiliency over long-running
computations.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui <[email protected]>wrote:

> Hmm, I misread that you need a sliding window.
> I am thinking out loud here: one way of dealing with this is to improve
> NLineInputFormat so that partitions will have a small overlapping portion
> in this case the overlapping portion is 50 columns
> So let say the matrix is divided into overlapping partitions like this
> [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can
> assign each partition to a mapper to do mapPartition on it.
>
>
> --------------------------------------------
> Michael (Bach) Bui, PhD,
> Senior Staff Architect, ADATAO Inc.
> www.adatao.com
>
>
>
>
> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]>
> wrote:
>
> Here, Tom assumed that you have your big matrix already being loaded in
> one machine. Now if you want to distribute it to slave nodes you will need
> to broadcast it. I would expect this broadcasting will be done once at the
> beginning of your algorithm and the computation time will dominate the
> overall execution time.
>
> On the other hand, a better way to deal with huge matrix is to store the
> data in hdfs and load data into each slaves partition-by-partition. This is
> fundamental data processing pattern in Spark/Hadoop world.
> If you opt to do this, you will have to use suitable InputFormat to make
> sure each partition has the right amount of row that you want.
> For example if you are lucky each HDFS partition have exact n*50 rows,
> then you can use rdd.mapPartition(func). Where func will take care of
> splitting n*50-row partition into n sub matrix
>
> However, HDFS TextInput or SequnceInputFormat format will not guarantee
> each partition has certain number of rows. What you want is
> NLineInputFormat, which I think currently has not been pulled into Spark
> yet.
> If everyone think this is needed, I can implement it quickly, it should be
> pretty easy.
>
>
> --------------------------------------------
> Michael (Bach) Bui, PhD,
> Senior Staff Architect, ADATAO Inc.
> www.adatao.com
>
>
>
>
> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <[email protected]>
> wrote:
>
>
>
>
> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected]> wrote:
>
>> Oh, I see.  I was thinking that there was a computational dependency on
>> one window to the next.  If the computations are independent, then I think
>> Spark can help you out quite a bit.
>>
>> I think you would want an RDD where each element is a window of your
>> dense matrix.  I'm not aware of a way to distribute the windows of the big
>> matrix in a way that doesn't involve broadcasting the whole thing.  You
>> might have to tweak some config options, but I think it would work
>> straightaway.  I would initialize the data structure like this:
>> val matB = sc.broadcast(myBigDenseMatrix)
>> val distributedChunks = sc.parallelize(0 until
>> numWindows).mapPartitions(it => it.map(windowID => getWindow(matB.value,
>> windowID) ) )
>>
>
> Here broadcast is used instead of calling parallelize on myBigDenseMatrix.
> Is it okay to broadcast a huge amount of data? Does sharing a big data mean
> a big network io overhead comparing to calling parallelize, or is this
> overhead optimized due to the of partitioning?
>
>
>>
>> Then just apply your matrix ops as map on
>>
>> You maybe have your own tool for dense matrix ops, but I would suggest
>> Scala Breeze.  You'll have to use an old version of Breeze (current builds
>> are for 2.10).  Spark with Scala-2.10 is a little way off.
>>
>>
>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia <[email protected]
>> > wrote:
>>
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek <[email protected]>wrote:
>>>
>>>> If you use an RDD[Array[Double]] with a row decomposition of the
>>>> matrix, you can index windows of the rows all you want, but you're limited
>>>> to 100 concurrent tasks.  You could use a column decomposition and access
>>>> subsets of the columns with a PartitionPruningRDD.  I have to say, though,
>>>> if you're doing dense matrix operations, they will be 100s of times faster
>>>> on a shared mem platform.  This particular matrix, at 800 MB could be a
>>>> Breeze on a single node.
>>>>
>>>
>>> The computation for every submatrix is very expensive, it takes days on
>>> a single node. I was hoping this can be reduced to hours or minutes with
>>> spark.
>>>
>>> Are you saying that spark is not suitable for this type of job?
>>>
>>
>>
>
>
>

Reply via email to