OK, broadcasting is probably out.  Let's assume that you have n nodes with
k cores per node.  Each node will process k windows, most of which will be
in common (hopefully), so each node will need the range of columns in
[node_id*k, (node_id+1)*k+50-1], and your data over-replication will be
50*n.

This is where my knowledge of Spark is running out.  It is certainly
syntactically possible to spin up threads inside of a map, but whether
those will run in parallel (assuming there are available cores on the
node)...I'm not really sure.  It is simply a matter of configuration to
create free cores on each node.  Anyone?

After that, you'd need to get the data into an RDD in the right form.  I
think Michael has the right idea, but the first question needs to be
answered.  You could also get really creative and put the matrix in a web
server and have each task download the columns it needs.





On Fri, Dec 20, 2013 at 3:07 PM, Aureliano Buendia <[email protected]>wrote:

> Also over thinking is appreciated in this problem, as my production data
> is actually near 100 x 1000,000,000 and data duplication could get messy
> with this.
>
> Sorry about the initial misinformation, I was thinking about my
> development/test data.
>
>
> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia 
> <[email protected]>wrote:
>
>>
>>
>>
>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]>wrote:
>>
>>> Totally agree.  Even with a 50x data replication, that's only 40 GB,
>>> which would be a fraction of standard cluster.  But since overthinking is a
>>> lot of fun, how about this: do a mapPartitions with a threaded subtask for
>>> each window.  Now you only need to replicate data across the boundaries of
>>> each partition of windows, rather than each window.
>>>
>>
>> How can this be written in spark scala?
>>
>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen <[email protected]>wrote:
>>>
>>>> Are we over-thinking the problem here? Since the per-window compute
>>>> task is hugely expensive, stateless from window to window, and the original
>>>> big matrix is just 1GB, the primary gain in using a parallel engine is in
>>>> distributing and scheduling these (long-running, isolated) tasks. I'm
>>>> reading that data loading and distribution are going to be a tiny fraction
>>>> of the overall compute time.
>>>>
>>>> If that's the case, it would make sense simply to start with a 1GB
>>>> Array[Double] on the driver, from that create an RDD comprising 20,000 rows
>>>> of 5,000 doubles each, map them out to the workers and have them interpret
>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each
>>>> have a good fraction of several days to figure it out :)
>>>>
>>>> This would be a great load test for Spark's resiliency over
>>>> long-running computations.
>>>>
>>>> --
>>>> Christopher T. Nguyen
>>>> Co-founder & CEO, Adatao <http://adatao.com>
>>>> linkedin.com/in/ctnguyen
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui <
>>>> [email protected]> wrote:
>>>>
>>>>> Hmm, I misread that you need a sliding window.
>>>>> I am thinking out loud here: one way of dealing with this is to
>>>>> improve NLineInputFormat so that partitions will have a small overlapping
>>>>> portion in this case the overlapping portion is 50 columns
>>>>> So let say the matrix is divided into overlapping partitions like this
>>>>> [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can
>>>>> assign each partition to a mapper to do mapPartition on it.
>>>>>
>>>>>
>>>>> --------------------------------------------
>>>>> Michael (Bach) Bui, PhD,
>>>>> Senior Staff Architect, ADATAO Inc.
>>>>> www.adatao.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]>
>>>>> wrote:
>>>>>
>>>>> Here, Tom assumed that you have your big matrix already being loaded
>>>>> in one machine. Now if you want to distribute it to slave nodes you will
>>>>> need to broadcast it. I would expect this broadcasting will be done once 
>>>>> at
>>>>> the beginning of your algorithm and the computation time will dominate the
>>>>> overall execution time.
>>>>>
>>>>> On the other hand, a better way to deal with huge matrix is to store
>>>>> the data in hdfs and load data into each slaves partition-by-partition.
>>>>> This is fundamental data processing pattern in Spark/Hadoop world.
>>>>> If you opt to do this, you will have to use suitable InputFormat to
>>>>> make sure each partition has the right amount of row that you want.
>>>>> For example if you are lucky each HDFS partition have exact n*50 rows,
>>>>> then you can use rdd.mapPartition(func). Where func will take care of
>>>>> splitting n*50-row partition into n sub matrix
>>>>>
>>>>> However, HDFS TextInput or SequnceInputFormat format will not
>>>>> guarantee each partition has certain number of rows. What you want is
>>>>> NLineInputFormat, which I think currently has not been pulled into Spark
>>>>> yet.
>>>>> If everyone think this is needed, I can implement it quickly, it
>>>>> should be pretty easy.
>>>>>
>>>>>
>>>>> --------------------------------------------
>>>>> Michael (Bach) Bui, PhD,
>>>>> Senior Staff Architect, ADATAO Inc.
>>>>> www.adatao.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <[email protected]>
>>>>> wrote:
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected]>wrote:
>>>>>
>>>>>> Oh, I see.  I was thinking that there was a computational dependency
>>>>>> on one window to the next.  If the computations are independent, then I
>>>>>> think Spark can help you out quite a bit.
>>>>>>
>>>>>> I think you would want an RDD where each element is a window of your
>>>>>> dense matrix.  I'm not aware of a way to distribute the windows of the 
>>>>>> big
>>>>>> matrix in a way that doesn't involve broadcasting the whole thing.  You
>>>>>> might have to tweak some config options, but I think it would work
>>>>>> straightaway.  I would initialize the data structure like this:
>>>>>> val matB = sc.broadcast(myBigDenseMatrix)
>>>>>> val distributedChunks = sc.parallelize(0 until
>>>>>> numWindows).mapPartitions(it => it.map(windowID => getWindow(matB.value,
>>>>>> windowID) ) )
>>>>>>
>>>>>
>>>>> Here broadcast is used instead of calling parallelize on
>>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does
>>>>> sharing a big data mean a big network io overhead comparing to calling
>>>>> parallelize, or is this overhead optimized due to the of partitioning?
>>>>>
>>>>>
>>>>>>
>>>>>> Then just apply your matrix ops as map on
>>>>>>
>>>>>> You maybe have your own tool for dense matrix ops, but I would
>>>>>> suggest Scala Breeze.  You'll have to use an old version of Breeze 
>>>>>> (current
>>>>>> builds are for 2.10).  Spark with Scala-2.10 is a little way off.
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek 
>>>>>>> <[email protected]>wrote:
>>>>>>>
>>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of the
>>>>>>>> matrix, you can index windows of the rows all you want, but you're 
>>>>>>>> limited
>>>>>>>> to 100 concurrent tasks.  You could use a column decomposition and 
>>>>>>>> access
>>>>>>>> subsets of the columns with a PartitionPruningRDD.  I have to say, 
>>>>>>>> though,
>>>>>>>> if you're doing dense matrix operations, they will be 100s of times 
>>>>>>>> faster
>>>>>>>> on a shared mem platform.  This particular matrix, at 800 MB could be a
>>>>>>>> Breeze on a single node.
>>>>>>>>
>>>>>>>
>>>>>>> The computation for every submatrix is very expensive, it takes days
>>>>>>> on a single node. I was hoping this can be reduced to hours or minutes 
>>>>>>> with
>>>>>>> spark.
>>>>>>>
>>>>>>> Are you saying that spark is not suitable for this type of job?
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to