On Fri, Dec 20, 2013 at 10:52 PM, Christopher Nguyen <[email protected]> wrote:
> A couple of fixes inline. > > -- > Christopher T. Nguyen > Co-founder & CEO, Adatao <http://adatao.com> > linkedin.com/in/ctnguyen > > > > On Fri, Dec 20, 2013 at 2:34 PM, Christopher Nguyen <[email protected]>wrote: > >> Aureliano, would something like this work? The red code is the only >> place where you have to think parallel here. >> >> >> while (thereIsStillDataToWorkOn) { >> bigChunk: Array[Double] = readInTheNextNx100x50MatrixData() // N is a >> design variable >> bigChunkAsArraysOf5000Doubles: Array[Array[Double]] = >> restructureIntoArraysOf5000Doubles(bigChunk) >> myRDD = sc >> > > s/myRDD/myResult > > >> .parallelize(bigChunkAsArraysOf5000Doubles, 1) >> > > That number of partitions shouldn't be 1, but some function of the size of > your cluster, or you can just let Spark decide. > > >> .map(eachArrayOf5000Doubles => >> someVeryLongRunningTransformer(eachArrayOf5000Doubles)) >> .collect() >> > > collect() or reduce() etc., whatever is appropriate for your > transformation/operation. > Is this part async, or does the loop wait for the task to complete before the next iteraton? > > >> } >> >> >> Next, pardon me if this is too basic, but in case it is helpful: this >> code first runs on a single machine, called a Driver, which must have >> access to the source data. When we call parallelize(), Spark handles all >> the partitioning of the data into the available Workers, including >> serializing each data partition to the Workers, and collecting the results >> back in one place. There is no data duplication other than the Worker's >> copy of the data from the Driver. >> >> This indeed does not take advantage of all of the other available Spark >> goodnesses that others have correctly pointed out on this thread, such as >> broadcasting, mapPartitions() vs map(), parallel data loading across HDFS >> partitions, etc. But it would be exactly the right thing to do if it best >> fits your problem statement. >> -- >> Christopher T. Nguyen >> Co-founder & CEO, Adatao <http://adatao.com> >> linkedin.com/in/ctnguyen >> >> >> >> On Fri, Dec 20, 2013 at 2:01 PM, Aureliano Buendia >> <[email protected]>wrote: >> >>> >>> >>> >>> On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen <[email protected]>wrote: >>> >>>> Aureliano, how would your production data be coming in and accessed? >>>> It's possible that you can still think of that level as a serial operation >>>> (outer loop, large chunks) first before worrying about parallelizing the >>>> computation of the tiny chunks. >>>> >>> >>> It's a batch processing of time series data. Perhaps a serial processing >>> where each serial item is a set of parallel processes could be an option. >>> Does spark have such option? >>> >>> >>>> >>>> And I'm reading that when you refer to "data duplication", you're >>>> worried about that as a side-effect problem, not as a requirement, correct? >>>> >>> >>> That's right. Data duplication is certainly not a requirement, we are >>> not trying to avoid it, but if it's a side effect that leads to some >>> considerable io overhead, it's not going to be good. >>> >>> >>>> And if the former, I don't see that data duplication is a necessary >>>> side effect. Unless I missed something in the thread, don't use broadcast. >>>> >>> >>> I take it that overlapped partitions does not mean data duplication. I >>> wasn't sure if partitions hold a copy, or a reference. >>> >>> >>>> >>>> Put another way, I see the scale of this challenge as far more >>>> operational than logical (when squinted at from the right angle :) >>>> >>>> -- >>>> Christopher T. Nguyen >>>> Co-founder & CEO, Adatao <http://adatao.com> >>>> linkedin.com/in/ctnguyen >>>> >>>> >>>> >>>> On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia < >>>> [email protected]> wrote: >>>> >>>>> Also over thinking is appreciated in this problem, as my production >>>>> data is actually near 100 x 1000,000,000 and data duplication could get >>>>> messy with this. >>>>> >>>>> Sorry about the initial misinformation, I was thinking about my >>>>> development/test data. >>>>> >>>>> >>>>> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]>wrote: >>>>>> >>>>>>> Totally agree. Even with a 50x data replication, that's only 40 GB, >>>>>>> which would be a fraction of standard cluster. But since overthinking >>>>>>> is a >>>>>>> lot of fun, how about this: do a mapPartitions with a threaded subtask >>>>>>> for >>>>>>> each window. Now you only need to replicate data across the boundaries >>>>>>> of >>>>>>> each partition of windows, rather than each window. >>>>>>> >>>>>> >>>>>> How can this be written in spark scala? >>>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen >>>>>>> <[email protected]>wrote: >>>>>>> >>>>>>>> Are we over-thinking the problem here? Since the per-window compute >>>>>>>> task is hugely expensive, stateless from window to window, and the >>>>>>>> original >>>>>>>> big matrix is just 1GB, the primary gain in using a parallel engine is >>>>>>>> in >>>>>>>> distributing and scheduling these (long-running, isolated) tasks. I'm >>>>>>>> reading that data loading and distribution are going to be a tiny >>>>>>>> fraction >>>>>>>> of the overall compute time. >>>>>>>> >>>>>>>> If that's the case, it would make sense simply to start with a 1GB >>>>>>>> Array[Double] on the driver, from that create an RDD comprising 20,000 >>>>>>>> rows >>>>>>>> of 5,000 doubles each, map them out to the workers and have them >>>>>>>> interpret >>>>>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They >>>>>>>> each >>>>>>>> have a good fraction of several days to figure it out :) >>>>>>>> >>>>>>>> This would be a great load test for Spark's resiliency over >>>>>>>> long-running computations. >>>>>>>> >>>>>>>> -- >>>>>>>> Christopher T. Nguyen >>>>>>>> Co-founder & CEO, Adatao <http://adatao.com> >>>>>>>> linkedin.com/in/ctnguyen >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hmm, I misread that you need a sliding window. >>>>>>>>> I am thinking out loud here: one way of dealing with this is to >>>>>>>>> improve NLineInputFormat so that partitions will have a small >>>>>>>>> overlapping >>>>>>>>> portion in this case the overlapping portion is 50 columns >>>>>>>>> So let say the matrix is divided into overlapping partitions like >>>>>>>>> this [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … >>>>>>>>> then we >>>>>>>>> can assign each partition to a mapper to do mapPartition on it. >>>>>>>>> >>>>>>>>> >>>>>>>>> -------------------------------------------- >>>>>>>>> Michael (Bach) Bui, PhD, >>>>>>>>> Senior Staff Architect, ADATAO Inc. >>>>>>>>> www.adatao.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>> Here, Tom assumed that you have your big matrix already being >>>>>>>>> loaded in one machine. Now if you want to distribute it to slave >>>>>>>>> nodes you >>>>>>>>> will need to broadcast it. I would expect this broadcasting will be >>>>>>>>> done >>>>>>>>> once at the beginning of your algorithm and the computation time will >>>>>>>>> dominate the overall execution time. >>>>>>>>> >>>>>>>>> On the other hand, a better way to deal with huge matrix is to >>>>>>>>> store the data in hdfs and load data into each slaves >>>>>>>>> partition-by-partition. This is fundamental data processing pattern in >>>>>>>>> Spark/Hadoop world. >>>>>>>>> If you opt to do this, you will have to use suitable InputFormat >>>>>>>>> to make sure each partition has the right amount of row that you want. >>>>>>>>> For example if you are lucky each HDFS partition have exact n*50 >>>>>>>>> rows, then you can use rdd.mapPartition(func). Where func will take >>>>>>>>> care of >>>>>>>>> splitting n*50-row partition into n sub matrix >>>>>>>>> >>>>>>>>> However, HDFS TextInput or SequnceInputFormat format will not >>>>>>>>> guarantee each partition has certain number of rows. What you want is >>>>>>>>> NLineInputFormat, which I think currently has not been pulled into >>>>>>>>> Spark >>>>>>>>> yet. >>>>>>>>> If everyone think this is needed, I can implement it quickly, it >>>>>>>>> should be pretty easy. >>>>>>>>> >>>>>>>>> >>>>>>>>> -------------------------------------------- >>>>>>>>> Michael (Bach) Bui, PhD, >>>>>>>>> Senior Staff Architect, ADATAO Inc. >>>>>>>>> www.adatao.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected] >>>>>>>>> > wrote: >>>>>>>>> >>>>>>>>>> Oh, I see. I was thinking that there was a computational >>>>>>>>>> dependency on one window to the next. If the computations are >>>>>>>>>> independent, >>>>>>>>>> then I think Spark can help you out quite a bit. >>>>>>>>>> >>>>>>>>>> I think you would want an RDD where each element is a window of >>>>>>>>>> your dense matrix. I'm not aware of a way to distribute the windows >>>>>>>>>> of the >>>>>>>>>> big matrix in a way that doesn't involve broadcasting the whole >>>>>>>>>> thing. You >>>>>>>>>> might have to tweak some config options, but I think it would work >>>>>>>>>> straightaway. I would initialize the data structure like this: >>>>>>>>>> val matB = sc.broadcast(myBigDenseMatrix) >>>>>>>>>> val distributedChunks = sc.parallelize(0 until >>>>>>>>>> numWindows).mapPartitions(it => it.map(windowID => >>>>>>>>>> getWindow(matB.value, >>>>>>>>>> windowID) ) ) >>>>>>>>>> >>>>>>>>> >>>>>>>>> Here broadcast is used instead of calling parallelize on >>>>>>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does >>>>>>>>> sharing a big data mean a big network io overhead comparing to calling >>>>>>>>> parallelize, or is this overhead optimized due to the of partitioning? >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Then just apply your matrix ops as map on >>>>>>>>>> >>>>>>>>>> You maybe have your own tool for dense matrix ops, but I would >>>>>>>>>> suggest Scala Breeze. You'll have to use an old version of Breeze >>>>>>>>>> (current >>>>>>>>>> builds are for 2.10). Spark with Scala-2.10 is a little way off. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of >>>>>>>>>>>> the matrix, you can index windows of the rows all you want, but >>>>>>>>>>>> you're >>>>>>>>>>>> limited to 100 concurrent tasks. You could use a column >>>>>>>>>>>> decomposition and >>>>>>>>>>>> access subsets of the columns with a PartitionPruningRDD. I have >>>>>>>>>>>> to say, >>>>>>>>>>>> though, if you're doing dense matrix operations, they will be 100s >>>>>>>>>>>> of times >>>>>>>>>>>> faster on a shared mem platform. This particular matrix, at 800 >>>>>>>>>>>> MB could >>>>>>>>>>>> be a Breeze on a single node. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The computation for every submatrix is very expensive, it takes >>>>>>>>>>> days on a single node. I was hoping this can be reduced to hours or >>>>>>>>>>> minutes >>>>>>>>>>> with spark. >>>>>>>>>>> >>>>>>>>>>> Are you saying that spark is not suitable for this type of job? >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
