Here, Tom assumed that you have your big matrix already being loaded in one 
machine. Now if you want to distribute it to slave nodes you will need to 
broadcast it. I would expect this broadcasting will be done once at the 
beginning of your algorithm and the computation time will dominate the overall 
execution time.

On the other hand, a better way to deal with huge matrix is to store the data 
in hdfs and load data into each slaves partition-by-partition. This is 
fundamental data processing pattern in Spark/Hadoop world.
If you opt to do this, you will have to use suitable InputFormat to make sure 
each partition has the right amount of row that you want. 
For example if you are lucky each HDFS partition have exact n*50 rows, then you 
can use rdd.mapPartition(func). Where func will take care of splitting n*50-row 
partition into n sub matrix

However, HDFS TextInput or SequnceInputFormat format will not guarantee each 
partition has certain number of rows. What you want is NLineInputFormat, which 
I think currently has not been pulled into Spark yet.
If everyone think this is needed, I can implement it quickly, it should be 
pretty easy.


--------------------------------------------
Michael (Bach) Bui, PhD,
Senior Staff Architect, ADATAO Inc.
www.adatao.com




On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <[email protected]> wrote:

> 
> 
> 
> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected]> wrote:
> Oh, I see.  I was thinking that there was a computational dependency on one 
> window to the next.  If the computations are independent, then I think Spark 
> can help you out quite a bit.
> 
> I think you would want an RDD where each element is a window of your dense 
> matrix.  I'm not aware of a way to distribute the windows of the big matrix 
> in a way that doesn't involve broadcasting the whole thing.  You might have 
> to tweak some config options, but I think it would work straightaway.  I 
> would initialize the data structure like this:
> val matB = sc.broadcast(myBigDenseMatrix)
> val distributedChunks = sc.parallelize(0 until numWindows).mapPartitions(it 
> => it.map(windowID => getWindow(matB.value, windowID) ) )
> 
> Here broadcast is used instead of calling parallelize on myBigDenseMatrix. Is 
> it okay to broadcast a huge amount of data? Does sharing a big data mean a 
> big network io overhead comparing to calling parallelize, or is this overhead 
> optimized due to the of partitioning?
>  
> 
> Then just apply your matrix ops as map on  
> 
> You maybe have your own tool for dense matrix ops, but I would suggest Scala 
> Breeze.  You'll have to use an old version of Breeze (current builds are for 
> 2.10).  Spark with Scala-2.10 is a little way off.
> 
> 
> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia <[email protected]> 
> wrote:
> 
> 
> 
> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek <[email protected]> wrote:
> If you use an RDD[Array[Double]] with a row decomposition of the matrix, you 
> can index windows of the rows all you want, but you're limited to 100 
> concurrent tasks.  You could use a column decomposition and access subsets of 
> the columns with a PartitionPruningRDD.  I have to say, though, if you're 
> doing dense matrix operations, they will be 100s of times faster on a shared 
> mem platform.  This particular matrix, at 800 MB could be a Breeze on a 
> single node.
>  
> The computation for every submatrix is very expensive, it takes days on a 
> single node. I was hoping this can be reduced to hours or minutes with spark.
> 
> Are you saying that spark is not suitable for this type of job?
> 
> 

Reply via email to