On Fri, Dec 20, 2013 at 10:52 PM, Christopher Nguyen <[email protected]> wrote:

> A couple of fixes inline.
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Fri, Dec 20, 2013 at 2:34 PM, Christopher Nguyen <[email protected]>wrote:
>
>> Aureliano, would something like this work? The red code is the only
>> place where you have to think parallel here.
>>
>>
>> while (thereIsStillDataToWorkOn) {
>>   bigChunk: Array[Double] = readInTheNextNx100x50MatrixData() // N is a
>> design variable
>>   bigChunkAsArraysOf5000Doubles: Array[Array[Double]] =
>> restructureIntoArraysOf5000Doubles(bigChunk)
>>   myRDD = sc
>>
>
> s/myRDD/myResult
>
>
>>     .parallelize(bigChunkAsArraysOf5000Doubles, 1)
>>
>
> That number of partitions shouldn't be 1, but some function of the size of
> your cluster, or you can just let Spark decide.
>
>
>>     .map(eachArrayOf5000Doubles =>
>> someVeryLongRunningTransformer(eachArrayOf5000Doubles))
>>     .collect()
>>
>
> collect() or reduce() etc., whatever is appropriate for your
> transformation/operation.
>

Is this part async, or does the loop wait for the task to complete before
the next iteraton?


>
>
>> }
>>
>>
>> Next, pardon me if this is too basic, but in case it is helpful: this
>> code first runs on a single machine, called a Driver, which must have
>> access to the source data. When we call parallelize(), Spark handles all
>> the partitioning of the data into the available Workers, including
>> serializing each data partition to the Workers, and collecting the results
>> back in one place. There is no data duplication other than the Worker's
>> copy of the data from the Driver.
>>
>> This indeed does not take advantage of all of the other available Spark
>> goodnesses that others have correctly pointed out on this thread, such as
>> broadcasting, mapPartitions() vs map(), parallel data loading across HDFS
>> partitions, etc. But it would be exactly the right thing to do if it best
>> fits your problem statement.
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Fri, Dec 20, 2013 at 2:01 PM, Aureliano Buendia 
>> <[email protected]>wrote:
>>
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen <[email protected]>wrote:
>>>
>>>> Aureliano, how would your production data be coming in and accessed?
>>>> It's possible that you can still think of that level as a serial operation
>>>> (outer loop, large chunks) first before worrying about parallelizing the
>>>> computation of the tiny chunks.
>>>>
>>>
>>> It's a batch processing of time series data. Perhaps a serial processing
>>> where each serial item is a set of parallel processes could be an option.
>>> Does spark have such option?
>>>
>>>
>>>>
>>>> And I'm reading that when you refer to "data duplication", you're
>>>> worried about that as a side-effect problem, not as a requirement, correct?
>>>>
>>>
>>> That's right. Data duplication is certainly not a requirement, we are
>>> not trying to avoid it, but if it's a side effect that leads to some
>>> considerable io overhead, it's not going to be good.
>>>
>>>
>>>> And if the former, I don't see that data duplication is a necessary
>>>> side effect. Unless I missed something in the thread, don't use broadcast.
>>>>
>>>
>>> I take it that overlapped partitions does not mean data duplication. I
>>> wasn't sure if partitions hold a copy, or a reference.
>>>
>>>
>>>>
>>>> Put another way, I see the scale of this challenge as far more
>>>> operational than logical (when squinted at from the right angle :)
>>>>
>>>>  --
>>>> Christopher T. Nguyen
>>>> Co-founder & CEO, Adatao <http://adatao.com>
>>>> linkedin.com/in/ctnguyen
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia <
>>>> [email protected]> wrote:
>>>>
>>>>> Also over thinking is appreciated in this problem, as my production
>>>>> data is actually near 100 x 1000,000,000 and data duplication could get
>>>>> messy with this.
>>>>>
>>>>> Sorry about the initial misinformation, I was thinking about my
>>>>> development/test data.
>>>>>
>>>>>
>>>>> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]>wrote:
>>>>>>
>>>>>>> Totally agree.  Even with a 50x data replication, that's only 40 GB,
>>>>>>> which would be a fraction of standard cluster.  But since overthinking 
>>>>>>> is a
>>>>>>> lot of fun, how about this: do a mapPartitions with a threaded subtask 
>>>>>>> for
>>>>>>> each window.  Now you only need to replicate data across the boundaries 
>>>>>>> of
>>>>>>> each partition of windows, rather than each window.
>>>>>>>
>>>>>>
>>>>>> How can this be written in spark scala?
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen 
>>>>>>> <[email protected]>wrote:
>>>>>>>
>>>>>>>> Are we over-thinking the problem here? Since the per-window compute
>>>>>>>> task is hugely expensive, stateless from window to window, and the 
>>>>>>>> original
>>>>>>>> big matrix is just 1GB, the primary gain in using a parallel engine is 
>>>>>>>> in
>>>>>>>> distributing and scheduling these (long-running, isolated) tasks. I'm
>>>>>>>> reading that data loading and distribution are going to be a tiny 
>>>>>>>> fraction
>>>>>>>> of the overall compute time.
>>>>>>>>
>>>>>>>> If that's the case, it would make sense simply to start with a 1GB
>>>>>>>> Array[Double] on the driver, from that create an RDD comprising 20,000 
>>>>>>>> rows
>>>>>>>> of 5,000 doubles each, map them out to the workers and have them 
>>>>>>>> interpret
>>>>>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They 
>>>>>>>> each
>>>>>>>> have a good fraction of several days to figure it out :)
>>>>>>>>
>>>>>>>> This would be a great load test for Spark's resiliency over
>>>>>>>> long-running computations.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Christopher T. Nguyen
>>>>>>>> Co-founder & CEO, Adatao <http://adatao.com>
>>>>>>>> linkedin.com/in/ctnguyen
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hmm, I misread that you need a sliding window.
>>>>>>>>> I am thinking out loud here: one way of dealing with this is to
>>>>>>>>> improve NLineInputFormat so that partitions will have a small 
>>>>>>>>> overlapping
>>>>>>>>> portion in this case the overlapping portion is 50 columns
>>>>>>>>> So let say the matrix is divided into overlapping partitions like
>>>>>>>>> this [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … 
>>>>>>>>> then we
>>>>>>>>> can assign each partition to a mapper to do mapPartition on it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --------------------------------------------
>>>>>>>>> Michael (Bach) Bui, PhD,
>>>>>>>>> Senior Staff Architect, ADATAO Inc.
>>>>>>>>> www.adatao.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>> Here, Tom assumed that you have your big matrix already being
>>>>>>>>> loaded in one machine. Now if you want to distribute it to slave 
>>>>>>>>> nodes you
>>>>>>>>> will need to broadcast it. I would expect this broadcasting will be 
>>>>>>>>> done
>>>>>>>>> once at the beginning of your algorithm and the computation time will
>>>>>>>>> dominate the overall execution time.
>>>>>>>>>
>>>>>>>>> On the other hand, a better way to deal with huge matrix is to
>>>>>>>>> store the data in hdfs and load data into each slaves
>>>>>>>>> partition-by-partition. This is fundamental data processing pattern in
>>>>>>>>> Spark/Hadoop world.
>>>>>>>>> If you opt to do this, you will have to use suitable InputFormat
>>>>>>>>> to make sure each partition has the right amount of row that you want.
>>>>>>>>> For example if you are lucky each HDFS partition have exact n*50
>>>>>>>>> rows, then you can use rdd.mapPartition(func). Where func will take 
>>>>>>>>> care of
>>>>>>>>> splitting n*50-row partition into n sub matrix
>>>>>>>>>
>>>>>>>>> However, HDFS TextInput or SequnceInputFormat format will not
>>>>>>>>> guarantee each partition has certain number of rows. What you want is
>>>>>>>>> NLineInputFormat, which I think currently has not been pulled into 
>>>>>>>>> Spark
>>>>>>>>> yet.
>>>>>>>>> If everyone think this is needed, I can implement it quickly, it
>>>>>>>>> should be pretty easy.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --------------------------------------------
>>>>>>>>> Michael (Bach) Bui, PhD,
>>>>>>>>> Senior Staff Architect, ADATAO Inc.
>>>>>>>>> www.adatao.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected]
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> Oh, I see.  I was thinking that there was a computational
>>>>>>>>>> dependency on one window to the next.  If the computations are 
>>>>>>>>>> independent,
>>>>>>>>>> then I think Spark can help you out quite a bit.
>>>>>>>>>>
>>>>>>>>>> I think you would want an RDD where each element is a window of
>>>>>>>>>> your dense matrix.  I'm not aware of a way to distribute the windows 
>>>>>>>>>> of the
>>>>>>>>>> big matrix in a way that doesn't involve broadcasting the whole 
>>>>>>>>>> thing.  You
>>>>>>>>>> might have to tweak some config options, but I think it would work
>>>>>>>>>> straightaway.  I would initialize the data structure like this:
>>>>>>>>>> val matB = sc.broadcast(myBigDenseMatrix)
>>>>>>>>>> val distributedChunks = sc.parallelize(0 until
>>>>>>>>>> numWindows).mapPartitions(it => it.map(windowID => 
>>>>>>>>>> getWindow(matB.value,
>>>>>>>>>> windowID) ) )
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Here broadcast is used instead of calling parallelize on
>>>>>>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does
>>>>>>>>> sharing a big data mean a big network io overhead comparing to calling
>>>>>>>>> parallelize, or is this overhead optimized due to the of partitioning?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Then just apply your matrix ops as map on
>>>>>>>>>>
>>>>>>>>>> You maybe have your own tool for dense matrix ops, but I would
>>>>>>>>>> suggest Scala Breeze.  You'll have to use an old version of Breeze 
>>>>>>>>>> (current
>>>>>>>>>> builds are for 2.10).  Spark with Scala-2.10 is a little way off.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of
>>>>>>>>>>>> the matrix, you can index windows of the rows all you want, but 
>>>>>>>>>>>> you're
>>>>>>>>>>>> limited to 100 concurrent tasks.  You could use a column 
>>>>>>>>>>>> decomposition and
>>>>>>>>>>>> access subsets of the columns with a PartitionPruningRDD.  I have 
>>>>>>>>>>>> to say,
>>>>>>>>>>>> though, if you're doing dense matrix operations, they will be 100s 
>>>>>>>>>>>> of times
>>>>>>>>>>>> faster on a shared mem platform.  This particular matrix, at 800 
>>>>>>>>>>>> MB could
>>>>>>>>>>>> be a Breeze on a single node.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The computation for every submatrix is very expensive, it takes
>>>>>>>>>>> days on a single node. I was hoping this can be reduced to hours or 
>>>>>>>>>>> minutes
>>>>>>>>>>> with spark.
>>>>>>>>>>>
>>>>>>>>>>> Are you saying that spark is not suitable for this type of job?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to