On Fri, Dec 20, 2013 at 10:34 PM, Christopher Nguyen <[email protected]> wrote:
> Aureliano, would something like this work? The red code is the only place
> where you have to think parallel here.
>
>
> while (thereIsStillDataToWorkOn) {
> bigChunk: Array[Double] = readInTheNextNx100x50MatrixData() // N is a
> design variable
> bigChunkAsArraysOf5000Doubles: Array[Array[Double]] =
> restructureIntoArraysOf5000Doubles(bigChunk)
> myRDD = sc
> .parallelize(bigChunkAsArraysOf5000Doubles, 1)
> .map(eachArrayOf5000Doubles =>
> someVeryLongRunningTransformer(eachArrayOf5000Doubles))
> .collect()
> }
>
>
> Next, pardon me if this is too basic, but in case it is helpful: this code
> first runs on a single machine, called a Driver, which must have access to
> the source data.
>
Thanks for the clear explanation.
> When we call parallelize(), Spark handles all the partitioning of the
> data into the available Workers, including serializing each data partition
> to the Workers, and collecting the results back in one place.
>
This would create nearly 1 billion RDD's. Is that ok?
> There is no data duplication other than the Worker's copy of the data from
> the Driver.
>
Each out of boundary 50 column window shares 2*(49 + 48 + 47 + ... + 1)
between the sliding windows on left and right. All of these columns are
sent over the network many times. isn;t that duplication of data transfer?
>
> This indeed does not take advantage of all of the other available Spark
> goodnesses that others have correctly pointed out on this thread, such as
> broadcasting, mapPartitions() vs map(), parallel data loading across HDFS
> partitions, etc. But it would be exactly the right thing to do if it best
> fits your problem statement.
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Fri, Dec 20, 2013 at 2:01 PM, Aureliano Buendia
> <[email protected]>wrote:
>
>>
>>
>>
>> On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen <[email protected]>wrote:
>>
>>> Aureliano, how would your production data be coming in and accessed?
>>> It's possible that you can still think of that level as a serial operation
>>> (outer loop, large chunks) first before worrying about parallelizing the
>>> computation of the tiny chunks.
>>>
>>
>> It's a batch processing of time series data. Perhaps a serial processing
>> where each serial item is a set of parallel processes could be an option.
>> Does spark have such option?
>>
>>
>>>
>>> And I'm reading that when you refer to "data duplication", you're
>>> worried about that as a side-effect problem, not as a requirement, correct?
>>>
>>
>> That's right. Data duplication is certainly not a requirement, we are not
>> trying to avoid it, but if it's a side effect that leads to some
>> considerable io overhead, it's not going to be good.
>>
>>
>>> And if the former, I don't see that data duplication is a necessary side
>>> effect. Unless I missed something in the thread, don't use broadcast.
>>>
>>
>> I take it that overlapped partitions does not mean data duplication. I
>> wasn't sure if partitions hold a copy, or a reference.
>>
>>
>>>
>>> Put another way, I see the scale of this challenge as far more
>>> operational than logical (when squinted at from the right angle :)
>>>
>>> --
>>> Christopher T. Nguyen
>>> Co-founder & CEO, Adatao <http://adatao.com>
>>> linkedin.com/in/ctnguyen
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia <[email protected]
>>> > wrote:
>>>
>>>> Also over thinking is appreciated in this problem, as my production
>>>> data is actually near 100 x 1000,000,000 and data duplication could get
>>>> messy with this.
>>>>
>>>> Sorry about the initial misinformation, I was thinking about my
>>>> development/test data.
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]>wrote:
>>>>>
>>>>>> Totally agree. Even with a 50x data replication, that's only 40 GB,
>>>>>> which would be a fraction of standard cluster. But since overthinking
>>>>>> is a
>>>>>> lot of fun, how about this: do a mapPartitions with a threaded subtask
>>>>>> for
>>>>>> each window. Now you only need to replicate data across the boundaries
>>>>>> of
>>>>>> each partition of windows, rather than each window.
>>>>>>
>>>>>
>>>>> How can this be written in spark scala?
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen
>>>>>> <[email protected]>wrote:
>>>>>>
>>>>>>> Are we over-thinking the problem here? Since the per-window compute
>>>>>>> task is hugely expensive, stateless from window to window, and the
>>>>>>> original
>>>>>>> big matrix is just 1GB, the primary gain in using a parallel engine is
>>>>>>> in
>>>>>>> distributing and scheduling these (long-running, isolated) tasks. I'm
>>>>>>> reading that data loading and distribution are going to be a tiny
>>>>>>> fraction
>>>>>>> of the overall compute time.
>>>>>>>
>>>>>>> If that's the case, it would make sense simply to start with a 1GB
>>>>>>> Array[Double] on the driver, from that create an RDD comprising 20,000
>>>>>>> rows
>>>>>>> of 5,000 doubles each, map them out to the workers and have them
>>>>>>> interpret
>>>>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They
>>>>>>> each
>>>>>>> have a good fraction of several days to figure it out :)
>>>>>>>
>>>>>>> This would be a great load test for Spark's resiliency over
>>>>>>> long-running computations.
>>>>>>>
>>>>>>> --
>>>>>>> Christopher T. Nguyen
>>>>>>> Co-founder & CEO, Adatao <http://adatao.com>
>>>>>>> linkedin.com/in/ctnguyen
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hmm, I misread that you need a sliding window.
>>>>>>>> I am thinking out loud here: one way of dealing with this is to
>>>>>>>> improve NLineInputFormat so that partitions will have a small
>>>>>>>> overlapping
>>>>>>>> portion in this case the overlapping portion is 50 columns
>>>>>>>> So let say the matrix is divided into overlapping partitions like
>>>>>>>> this [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] …
>>>>>>>> then we
>>>>>>>> can assign each partition to a mapper to do mapPartition on it.
>>>>>>>>
>>>>>>>>
>>>>>>>> --------------------------------------------
>>>>>>>> Michael (Bach) Bui, PhD,
>>>>>>>> Senior Staff Architect, ADATAO Inc.
>>>>>>>> www.adatao.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Here, Tom assumed that you have your big matrix already being
>>>>>>>> loaded in one machine. Now if you want to distribute it to slave nodes
>>>>>>>> you
>>>>>>>> will need to broadcast it. I would expect this broadcasting will be
>>>>>>>> done
>>>>>>>> once at the beginning of your algorithm and the computation time will
>>>>>>>> dominate the overall execution time.
>>>>>>>>
>>>>>>>> On the other hand, a better way to deal with huge matrix is to
>>>>>>>> store the data in hdfs and load data into each slaves
>>>>>>>> partition-by-partition. This is fundamental data processing pattern in
>>>>>>>> Spark/Hadoop world.
>>>>>>>> If you opt to do this, you will have to use suitable InputFormat to
>>>>>>>> make sure each partition has the right amount of row that you want.
>>>>>>>> For example if you are lucky each HDFS partition have exact n*50
>>>>>>>> rows, then you can use rdd.mapPartition(func). Where func will take
>>>>>>>> care of
>>>>>>>> splitting n*50-row partition into n sub matrix
>>>>>>>>
>>>>>>>> However, HDFS TextInput or SequnceInputFormat format will not
>>>>>>>> guarantee each partition has certain number of rows. What you want is
>>>>>>>> NLineInputFormat, which I think currently has not been pulled into
>>>>>>>> Spark
>>>>>>>> yet.
>>>>>>>> If everyone think this is needed, I can implement it quickly, it
>>>>>>>> should be pretty easy.
>>>>>>>>
>>>>>>>>
>>>>>>>> --------------------------------------------
>>>>>>>> Michael (Bach) Bui, PhD,
>>>>>>>> Senior Staff Architect, ADATAO Inc.
>>>>>>>> www.adatao.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek
>>>>>>>> <[email protected]>wrote:
>>>>>>>>
>>>>>>>>> Oh, I see. I was thinking that there was a computational
>>>>>>>>> dependency on one window to the next. If the computations are
>>>>>>>>> independent,
>>>>>>>>> then I think Spark can help you out quite a bit.
>>>>>>>>>
>>>>>>>>> I think you would want an RDD where each element is a window of
>>>>>>>>> your dense matrix. I'm not aware of a way to distribute the windows
>>>>>>>>> of the
>>>>>>>>> big matrix in a way that doesn't involve broadcasting the whole
>>>>>>>>> thing. You
>>>>>>>>> might have to tweak some config options, but I think it would work
>>>>>>>>> straightaway. I would initialize the data structure like this:
>>>>>>>>> val matB = sc.broadcast(myBigDenseMatrix)
>>>>>>>>> val distributedChunks = sc.parallelize(0 until
>>>>>>>>> numWindows).mapPartitions(it => it.map(windowID =>
>>>>>>>>> getWindow(matB.value,
>>>>>>>>> windowID) ) )
>>>>>>>>>
>>>>>>>>
>>>>>>>> Here broadcast is used instead of calling parallelize on
>>>>>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does
>>>>>>>> sharing a big data mean a big network io overhead comparing to calling
>>>>>>>> parallelize, or is this overhead optimized due to the of partitioning?
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Then just apply your matrix ops as map on
>>>>>>>>>
>>>>>>>>> You maybe have your own tool for dense matrix ops, but I would
>>>>>>>>> suggest Scala Breeze. You'll have to use an old version of Breeze
>>>>>>>>> (current
>>>>>>>>> builds are for 2.10). Spark with Scala-2.10 is a little way off.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of the
>>>>>>>>>>> matrix, you can index windows of the rows all you want, but you're
>>>>>>>>>>> limited
>>>>>>>>>>> to 100 concurrent tasks. You could use a column decomposition and
>>>>>>>>>>> access
>>>>>>>>>>> subsets of the columns with a PartitionPruningRDD. I have to say,
>>>>>>>>>>> though,
>>>>>>>>>>> if you're doing dense matrix operations, they will be 100s of times
>>>>>>>>>>> faster
>>>>>>>>>>> on a shared mem platform. This particular matrix, at 800 MB could
>>>>>>>>>>> be a
>>>>>>>>>>> Breeze on a single node.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The computation for every submatrix is very expensive, it takes
>>>>>>>>>> days on a single node. I was hoping this can be reduced to hours or
>>>>>>>>>> minutes
>>>>>>>>>> with spark.
>>>>>>>>>>
>>>>>>>>>> Are you saying that spark is not suitable for this type of job?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>