On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]> wrote:

> Totally agree.  Even with a 50x data replication, that's only 40 GB, which
> would be a fraction of standard cluster.  But since overthinking is a lot
> of fun, how about this: do a mapPartitions with a threaded subtask for each
> window.  Now you only need to replicate data across the boundaries of each
> partition of windows, rather than each window.
>

How can this be written in spark scala?


>
>
> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen <[email protected]>wrote:
>
>> Are we over-thinking the problem here? Since the per-window compute task
>> is hugely expensive, stateless from window to window, and the original big
>> matrix is just 1GB, the primary gain in using a parallel engine is in
>> distributing and scheduling these (long-running, isolated) tasks. I'm
>> reading that data loading and distribution are going to be a tiny fraction
>> of the overall compute time.
>>
>> If that's the case, it would make sense simply to start with a 1GB
>> Array[Double] on the driver, from that create an RDD comprising 20,000 rows
>> of 5,000 doubles each, map them out to the workers and have them interpret
>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each
>> have a good fraction of several days to figure it out :)
>>
>> This would be a great load test for Spark's resiliency over long-running
>> computations.
>>
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui 
>> <[email protected]>wrote:
>>
>>> Hmm, I misread that you need a sliding window.
>>> I am thinking out loud here: one way of dealing with this is to improve
>>> NLineInputFormat so that partitions will have a small overlapping portion
>>> in this case the overlapping portion is 50 columns
>>> So let say the matrix is divided into overlapping partitions like this
>>> [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can
>>> assign each partition to a mapper to do mapPartition on it.
>>>
>>>
>>> --------------------------------------------
>>> Michael (Bach) Bui, PhD,
>>> Senior Staff Architect, ADATAO Inc.
>>> www.adatao.com
>>>
>>>
>>>
>>>
>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]>
>>> wrote:
>>>
>>> Here, Tom assumed that you have your big matrix already being loaded in
>>> one machine. Now if you want to distribute it to slave nodes you will need
>>> to broadcast it. I would expect this broadcasting will be done once at the
>>> beginning of your algorithm and the computation time will dominate the
>>> overall execution time.
>>>
>>> On the other hand, a better way to deal with huge matrix is to store the
>>> data in hdfs and load data into each slaves partition-by-partition. This is
>>> fundamental data processing pattern in Spark/Hadoop world.
>>> If you opt to do this, you will have to use suitable InputFormat to make
>>> sure each partition has the right amount of row that you want.
>>> For example if you are lucky each HDFS partition have exact n*50 rows,
>>> then you can use rdd.mapPartition(func). Where func will take care of
>>> splitting n*50-row partition into n sub matrix
>>>
>>> However, HDFS TextInput or SequnceInputFormat format will not guarantee
>>> each partition has certain number of rows. What you want is
>>> NLineInputFormat, which I think currently has not been pulled into Spark
>>> yet.
>>> If everyone think this is needed, I can implement it quickly, it should
>>> be pretty easy.
>>>
>>>
>>> --------------------------------------------
>>> Michael (Bach) Bui, PhD,
>>> Senior Staff Architect, ADATAO Inc.
>>> www.adatao.com
>>>
>>>
>>>
>>>
>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <[email protected]>
>>> wrote:
>>>
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected]>wrote:
>>>
>>>> Oh, I see.  I was thinking that there was a computational dependency on
>>>> one window to the next.  If the computations are independent, then I think
>>>> Spark can help you out quite a bit.
>>>>
>>>> I think you would want an RDD where each element is a window of your
>>>> dense matrix.  I'm not aware of a way to distribute the windows of the big
>>>> matrix in a way that doesn't involve broadcasting the whole thing.  You
>>>> might have to tweak some config options, but I think it would work
>>>> straightaway.  I would initialize the data structure like this:
>>>> val matB = sc.broadcast(myBigDenseMatrix)
>>>> val distributedChunks = sc.parallelize(0 until
>>>> numWindows).mapPartitions(it => it.map(windowID => getWindow(matB.value,
>>>> windowID) ) )
>>>>
>>>
>>> Here broadcast is used instead of calling parallelize on
>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does
>>> sharing a big data mean a big network io overhead comparing to calling
>>> parallelize, or is this overhead optimized due to the of partitioning?
>>>
>>>
>>>>
>>>> Then just apply your matrix ops as map on
>>>>
>>>> You maybe have your own tool for dense matrix ops, but I would suggest
>>>> Scala Breeze.  You'll have to use an old version of Breeze (current builds
>>>> are for 2.10).  Spark with Scala-2.10 is a little way off.
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek <[email protected]>wrote:
>>>>>
>>>>>> If you use an RDD[Array[Double]] with a row decomposition of the
>>>>>> matrix, you can index windows of the rows all you want, but you're 
>>>>>> limited
>>>>>> to 100 concurrent tasks.  You could use a column decomposition and access
>>>>>> subsets of the columns with a PartitionPruningRDD.  I have to say, 
>>>>>> though,
>>>>>> if you're doing dense matrix operations, they will be 100s of times 
>>>>>> faster
>>>>>> on a shared mem platform.  This particular matrix, at 800 MB could be a
>>>>>> Breeze on a single node.
>>>>>>
>>>>>
>>>>> The computation for every submatrix is very expensive, it takes days
>>>>> on a single node. I was hoping this can be reduced to hours or minutes 
>>>>> with
>>>>> spark.
>>>>>
>>>>> Are you saying that spark is not suitable for this type of job?
>>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>

Reply via email to