A couple of fixes inline. -- Christopher T. Nguyen Co-founder & CEO, Adatao <http://adatao.com> linkedin.com/in/ctnguyen
On Fri, Dec 20, 2013 at 2:34 PM, Christopher Nguyen <[email protected]> wrote: > Aureliano, would something like this work? The red code is the only place > where you have to think parallel here. > > > while (thereIsStillDataToWorkOn) { > bigChunk: Array[Double] = readInTheNextNx100x50MatrixData() // N is a > design variable > bigChunkAsArraysOf5000Doubles: Array[Array[Double]] = > restructureIntoArraysOf5000Doubles(bigChunk) > myRDD = sc > s/myRDD/myResult > .parallelize(bigChunkAsArraysOf5000Doubles, 1) > That number of partitions shouldn't be 1, but some function of the size of your cluster, or you can just let Spark decide. > .map(eachArrayOf5000Doubles => > someVeryLongRunningTransformer(eachArrayOf5000Doubles)) > .collect() > collect() or reduce() etc., whatever is appropriate for your transformation/operation. > } > > > Next, pardon me if this is too basic, but in case it is helpful: this code > first runs on a single machine, called a Driver, which must have access to > the source data. When we call parallelize(), Spark handles all the > partitioning of the data into the available Workers, including serializing > each data partition to the Workers, and collecting the results back in one > place. There is no data duplication other than the Worker's copy of the > data from the Driver. > > This indeed does not take advantage of all of the other available Spark > goodnesses that others have correctly pointed out on this thread, such as > broadcasting, mapPartitions() vs map(), parallel data loading across HDFS > partitions, etc. But it would be exactly the right thing to do if it best > fits your problem statement. > -- > Christopher T. Nguyen > Co-founder & CEO, Adatao <http://adatao.com> > linkedin.com/in/ctnguyen > > > > On Fri, Dec 20, 2013 at 2:01 PM, Aureliano Buendia > <[email protected]>wrote: > >> >> >> >> On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen <[email protected]>wrote: >> >>> Aureliano, how would your production data be coming in and accessed? >>> It's possible that you can still think of that level as a serial operation >>> (outer loop, large chunks) first before worrying about parallelizing the >>> computation of the tiny chunks. >>> >> >> It's a batch processing of time series data. Perhaps a serial processing >> where each serial item is a set of parallel processes could be an option. >> Does spark have such option? >> >> >>> >>> And I'm reading that when you refer to "data duplication", you're >>> worried about that as a side-effect problem, not as a requirement, correct? >>> >> >> That's right. Data duplication is certainly not a requirement, we are not >> trying to avoid it, but if it's a side effect that leads to some >> considerable io overhead, it's not going to be good. >> >> >>> And if the former, I don't see that data duplication is a necessary side >>> effect. Unless I missed something in the thread, don't use broadcast. >>> >> >> I take it that overlapped partitions does not mean data duplication. I >> wasn't sure if partitions hold a copy, or a reference. >> >> >>> >>> Put another way, I see the scale of this challenge as far more >>> operational than logical (when squinted at from the right angle :) >>> >>> -- >>> Christopher T. Nguyen >>> Co-founder & CEO, Adatao <http://adatao.com> >>> linkedin.com/in/ctnguyen >>> >>> >>> >>> On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia <[email protected] >>> > wrote: >>> >>>> Also over thinking is appreciated in this problem, as my production >>>> data is actually near 100 x 1000,000,000 and data duplication could get >>>> messy with this. >>>> >>>> Sorry about the initial misinformation, I was thinking about my >>>> development/test data. >>>> >>>> >>>> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia < >>>> [email protected]> wrote: >>>> >>>>> >>>>> >>>>> >>>>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]>wrote: >>>>> >>>>>> Totally agree. Even with a 50x data replication, that's only 40 GB, >>>>>> which would be a fraction of standard cluster. But since overthinking >>>>>> is a >>>>>> lot of fun, how about this: do a mapPartitions with a threaded subtask >>>>>> for >>>>>> each window. Now you only need to replicate data across the boundaries >>>>>> of >>>>>> each partition of windows, rather than each window. >>>>>> >>>>> >>>>> How can this be written in spark scala? >>>>> >>>>> >>>>>> >>>>>> >>>>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen >>>>>> <[email protected]>wrote: >>>>>> >>>>>>> Are we over-thinking the problem here? Since the per-window compute >>>>>>> task is hugely expensive, stateless from window to window, and the >>>>>>> original >>>>>>> big matrix is just 1GB, the primary gain in using a parallel engine is >>>>>>> in >>>>>>> distributing and scheduling these (long-running, isolated) tasks. I'm >>>>>>> reading that data loading and distribution are going to be a tiny >>>>>>> fraction >>>>>>> of the overall compute time. >>>>>>> >>>>>>> If that's the case, it would make sense simply to start with a 1GB >>>>>>> Array[Double] on the driver, from that create an RDD comprising 20,000 >>>>>>> rows >>>>>>> of 5,000 doubles each, map them out to the workers and have them >>>>>>> interpret >>>>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They >>>>>>> each >>>>>>> have a good fraction of several days to figure it out :) >>>>>>> >>>>>>> This would be a great load test for Spark's resiliency over >>>>>>> long-running computations. >>>>>>> >>>>>>> -- >>>>>>> Christopher T. Nguyen >>>>>>> Co-founder & CEO, Adatao <http://adatao.com> >>>>>>> linkedin.com/in/ctnguyen >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hmm, I misread that you need a sliding window. >>>>>>>> I am thinking out loud here: one way of dealing with this is to >>>>>>>> improve NLineInputFormat so that partitions will have a small >>>>>>>> overlapping >>>>>>>> portion in this case the overlapping portion is 50 columns >>>>>>>> So let say the matrix is divided into overlapping partitions like >>>>>>>> this [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … >>>>>>>> then we >>>>>>>> can assign each partition to a mapper to do mapPartition on it. >>>>>>>> >>>>>>>> >>>>>>>> -------------------------------------------- >>>>>>>> Michael (Bach) Bui, PhD, >>>>>>>> Senior Staff Architect, ADATAO Inc. >>>>>>>> www.adatao.com >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Here, Tom assumed that you have your big matrix already being >>>>>>>> loaded in one machine. Now if you want to distribute it to slave nodes >>>>>>>> you >>>>>>>> will need to broadcast it. I would expect this broadcasting will be >>>>>>>> done >>>>>>>> once at the beginning of your algorithm and the computation time will >>>>>>>> dominate the overall execution time. >>>>>>>> >>>>>>>> On the other hand, a better way to deal with huge matrix is to >>>>>>>> store the data in hdfs and load data into each slaves >>>>>>>> partition-by-partition. This is fundamental data processing pattern in >>>>>>>> Spark/Hadoop world. >>>>>>>> If you opt to do this, you will have to use suitable InputFormat to >>>>>>>> make sure each partition has the right amount of row that you want. >>>>>>>> For example if you are lucky each HDFS partition have exact n*50 >>>>>>>> rows, then you can use rdd.mapPartition(func). Where func will take >>>>>>>> care of >>>>>>>> splitting n*50-row partition into n sub matrix >>>>>>>> >>>>>>>> However, HDFS TextInput or SequnceInputFormat format will not >>>>>>>> guarantee each partition has certain number of rows. What you want is >>>>>>>> NLineInputFormat, which I think currently has not been pulled into >>>>>>>> Spark >>>>>>>> yet. >>>>>>>> If everyone think this is needed, I can implement it quickly, it >>>>>>>> should be pretty easy. >>>>>>>> >>>>>>>> >>>>>>>> -------------------------------------------- >>>>>>>> Michael (Bach) Bui, PhD, >>>>>>>> Senior Staff Architect, ADATAO Inc. >>>>>>>> www.adatao.com >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek >>>>>>>> <[email protected]>wrote: >>>>>>>> >>>>>>>>> Oh, I see. I was thinking that there was a computational >>>>>>>>> dependency on one window to the next. If the computations are >>>>>>>>> independent, >>>>>>>>> then I think Spark can help you out quite a bit. >>>>>>>>> >>>>>>>>> I think you would want an RDD where each element is a window of >>>>>>>>> your dense matrix. I'm not aware of a way to distribute the windows >>>>>>>>> of the >>>>>>>>> big matrix in a way that doesn't involve broadcasting the whole >>>>>>>>> thing. You >>>>>>>>> might have to tweak some config options, but I think it would work >>>>>>>>> straightaway. I would initialize the data structure like this: >>>>>>>>> val matB = sc.broadcast(myBigDenseMatrix) >>>>>>>>> val distributedChunks = sc.parallelize(0 until >>>>>>>>> numWindows).mapPartitions(it => it.map(windowID => >>>>>>>>> getWindow(matB.value, >>>>>>>>> windowID) ) ) >>>>>>>>> >>>>>>>> >>>>>>>> Here broadcast is used instead of calling parallelize on >>>>>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does >>>>>>>> sharing a big data mean a big network io overhead comparing to calling >>>>>>>> parallelize, or is this overhead optimized due to the of partitioning? >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> Then just apply your matrix ops as map on >>>>>>>>> >>>>>>>>> You maybe have your own tool for dense matrix ops, but I would >>>>>>>>> suggest Scala Breeze. You'll have to use an old version of Breeze >>>>>>>>> (current >>>>>>>>> builds are for 2.10). Spark with Scala-2.10 is a little way off. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of the >>>>>>>>>>> matrix, you can index windows of the rows all you want, but you're >>>>>>>>>>> limited >>>>>>>>>>> to 100 concurrent tasks. You could use a column decomposition and >>>>>>>>>>> access >>>>>>>>>>> subsets of the columns with a PartitionPruningRDD. I have to say, >>>>>>>>>>> though, >>>>>>>>>>> if you're doing dense matrix operations, they will be 100s of times >>>>>>>>>>> faster >>>>>>>>>>> on a shared mem platform. This particular matrix, at 800 MB could >>>>>>>>>>> be a >>>>>>>>>>> Breeze on a single node. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> The computation for every submatrix is very expensive, it takes >>>>>>>>>> days on a single node. I was hoping this can be reduced to hours or >>>>>>>>>> minutes >>>>>>>>>> with spark. >>>>>>>>>> >>>>>>>>>> Are you saying that spark is not suitable for this type of job? >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
