On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen <[email protected]> wrote:

> Aureliano, how would your production data be coming in and accessed? It's
> possible that you can still think of that level as a serial operation
> (outer loop, large chunks) first before worrying about parallelizing the
> computation of the tiny chunks.
>

It's a batch processing of time series data. Perhaps a serial processing
where each serial item is a set of parallel processes could be an option.
Does spark have such option?


>
> And I'm reading that when you refer to "data duplication", you're worried
> about that as a side-effect problem, not as a requirement, correct?
>

That's right. Data duplication is certainly not a requirement, we are not
trying to avoid it, but if it's a side effect that leads to some
considerable io overhead, it's not going to be good.


> And if the former, I don't see that data duplication is a necessary side
> effect. Unless I missed something in the thread, don't use broadcast.
>

I take it that overlapped partitions does not mean data duplication. I
wasn't sure if partitions hold a copy, or a reference.


>
> Put another way, I see the scale of this challenge as far more operational
> than logical (when squinted at from the right angle :)
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia 
> <[email protected]>wrote:
>
>> Also over thinking is appreciated in this problem, as my production data
>> is actually near 100 x 1000,000,000 and data duplication could get messy
>> with this.
>>
>> Sorry about the initial misinformation, I was thinking about my
>> development/test data.
>>
>>
>> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia 
>> <[email protected]>wrote:
>>
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]>wrote:
>>>
>>>> Totally agree.  Even with a 50x data replication, that's only 40 GB,
>>>> which would be a fraction of standard cluster.  But since overthinking is a
>>>> lot of fun, how about this: do a mapPartitions with a threaded subtask for
>>>> each window.  Now you only need to replicate data across the boundaries of
>>>> each partition of windows, rather than each window.
>>>>
>>>
>>> How can this be written in spark scala?
>>>
>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen <[email protected]>wrote:
>>>>
>>>>> Are we over-thinking the problem here? Since the per-window compute
>>>>> task is hugely expensive, stateless from window to window, and the 
>>>>> original
>>>>> big matrix is just 1GB, the primary gain in using a parallel engine is in
>>>>> distributing and scheduling these (long-running, isolated) tasks. I'm
>>>>> reading that data loading and distribution are going to be a tiny fraction
>>>>> of the overall compute time.
>>>>>
>>>>> If that's the case, it would make sense simply to start with a 1GB
>>>>> Array[Double] on the driver, from that create an RDD comprising 20,000 
>>>>> rows
>>>>> of 5,000 doubles each, map them out to the workers and have them interpret
>>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each
>>>>> have a good fraction of several days to figure it out :)
>>>>>
>>>>> This would be a great load test for Spark's resiliency over
>>>>> long-running computations.
>>>>>
>>>>> --
>>>>> Christopher T. Nguyen
>>>>> Co-founder & CEO, Adatao <http://adatao.com>
>>>>> linkedin.com/in/ctnguyen
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hmm, I misread that you need a sliding window.
>>>>>> I am thinking out loud here: one way of dealing with this is to
>>>>>> improve NLineInputFormat so that partitions will have a small overlapping
>>>>>> portion in this case the overlapping portion is 50 columns
>>>>>> So let say the matrix is divided into overlapping partitions like
>>>>>> this [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then 
>>>>>> we
>>>>>> can assign each partition to a mapper to do mapPartition on it.
>>>>>>
>>>>>>
>>>>>> --------------------------------------------
>>>>>> Michael (Bach) Bui, PhD,
>>>>>> Senior Staff Architect, ADATAO Inc.
>>>>>> www.adatao.com
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> Here, Tom assumed that you have your big matrix already being loaded
>>>>>> in one machine. Now if you want to distribute it to slave nodes you will
>>>>>> need to broadcast it. I would expect this broadcasting will be done once 
>>>>>> at
>>>>>> the beginning of your algorithm and the computation time will dominate 
>>>>>> the
>>>>>> overall execution time.
>>>>>>
>>>>>> On the other hand, a better way to deal with huge matrix is to store
>>>>>> the data in hdfs and load data into each slaves partition-by-partition.
>>>>>> This is fundamental data processing pattern in Spark/Hadoop world.
>>>>>> If you opt to do this, you will have to use suitable InputFormat to
>>>>>> make sure each partition has the right amount of row that you want.
>>>>>> For example if you are lucky each HDFS partition have exact n*50
>>>>>> rows, then you can use rdd.mapPartition(func). Where func will take care 
>>>>>> of
>>>>>> splitting n*50-row partition into n sub matrix
>>>>>>
>>>>>> However, HDFS TextInput or SequnceInputFormat format will not
>>>>>> guarantee each partition has certain number of rows. What you want is
>>>>>> NLineInputFormat, which I think currently has not been pulled into Spark
>>>>>> yet.
>>>>>> If everyone think this is needed, I can implement it quickly, it
>>>>>> should be pretty easy.
>>>>>>
>>>>>>
>>>>>> --------------------------------------------
>>>>>> Michael (Bach) Bui, PhD,
>>>>>> Senior Staff Architect, ADATAO Inc.
>>>>>> www.adatao.com
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected]>wrote:
>>>>>>
>>>>>>> Oh, I see.  I was thinking that there was a computational dependency
>>>>>>> on one window to the next.  If the computations are independent, then I
>>>>>>> think Spark can help you out quite a bit.
>>>>>>>
>>>>>>> I think you would want an RDD where each element is a window of your
>>>>>>> dense matrix.  I'm not aware of a way to distribute the windows of the 
>>>>>>> big
>>>>>>> matrix in a way that doesn't involve broadcasting the whole thing.  You
>>>>>>> might have to tweak some config options, but I think it would work
>>>>>>> straightaway.  I would initialize the data structure like this:
>>>>>>> val matB = sc.broadcast(myBigDenseMatrix)
>>>>>>> val distributedChunks = sc.parallelize(0 until
>>>>>>> numWindows).mapPartitions(it => it.map(windowID => getWindow(matB.value,
>>>>>>> windowID) ) )
>>>>>>>
>>>>>>
>>>>>> Here broadcast is used instead of calling parallelize on
>>>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does
>>>>>> sharing a big data mean a big network io overhead comparing to calling
>>>>>> parallelize, or is this overhead optimized due to the of partitioning?
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Then just apply your matrix ops as map on
>>>>>>>
>>>>>>> You maybe have your own tool for dense matrix ops, but I would
>>>>>>> suggest Scala Breeze.  You'll have to use an old version of Breeze 
>>>>>>> (current
>>>>>>> builds are for 2.10).  Spark with Scala-2.10 is a little way off.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek 
>>>>>>>> <[email protected]>wrote:
>>>>>>>>
>>>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of the
>>>>>>>>> matrix, you can index windows of the rows all you want, but you're 
>>>>>>>>> limited
>>>>>>>>> to 100 concurrent tasks.  You could use a column decomposition and 
>>>>>>>>> access
>>>>>>>>> subsets of the columns with a PartitionPruningRDD.  I have to say, 
>>>>>>>>> though,
>>>>>>>>> if you're doing dense matrix operations, they will be 100s of times 
>>>>>>>>> faster
>>>>>>>>> on a shared mem platform.  This particular matrix, at 800 MB could be 
>>>>>>>>> a
>>>>>>>>> Breeze on a single node.
>>>>>>>>>
>>>>>>>>
>>>>>>>> The computation for every submatrix is very expensive, it takes
>>>>>>>> days on a single node. I was hoping this can be reduced to hours or 
>>>>>>>> minutes
>>>>>>>> with spark.
>>>>>>>>
>>>>>>>> Are you saying that spark is not suitable for this type of job?
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to