Aureliano, how would your production data be coming in and accessed? It's possible that you can still think of that level as a serial operation (outer loop, large chunks) first before worrying about parallelizing the computation of the tiny chunks.
And I'm reading that when you refer to "data duplication", you're worried about that as a side-effect problem, not as a requirement, correct? And if the former, I don't see that data duplication is a necessary side effect. Unless I missed something in the thread, don't use broadcast. Put another way, I see the scale of this challenge as far more operational than logical (when squinted at from the right angle :) -- Christopher T. Nguyen Co-founder & CEO, Adatao <http://adatao.com> linkedin.com/in/ctnguyen On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia <[email protected]>wrote: > Also over thinking is appreciated in this problem, as my production data > is actually near 100 x 1000,000,000 and data duplication could get messy > with this. > > Sorry about the initial misinformation, I was thinking about my > development/test data. > > > On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia > <[email protected]>wrote: > >> >> >> >> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]>wrote: >> >>> Totally agree. Even with a 50x data replication, that's only 40 GB, >>> which would be a fraction of standard cluster. But since overthinking is a >>> lot of fun, how about this: do a mapPartitions with a threaded subtask for >>> each window. Now you only need to replicate data across the boundaries of >>> each partition of windows, rather than each window. >>> >> >> How can this be written in spark scala? >> >> >>> >>> >>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen <[email protected]>wrote: >>> >>>> Are we over-thinking the problem here? Since the per-window compute >>>> task is hugely expensive, stateless from window to window, and the original >>>> big matrix is just 1GB, the primary gain in using a parallel engine is in >>>> distributing and scheduling these (long-running, isolated) tasks. I'm >>>> reading that data loading and distribution are going to be a tiny fraction >>>> of the overall compute time. >>>> >>>> If that's the case, it would make sense simply to start with a 1GB >>>> Array[Double] on the driver, from that create an RDD comprising 20,000 rows >>>> of 5,000 doubles each, map them out to the workers and have them interpret >>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each >>>> have a good fraction of several days to figure it out :) >>>> >>>> This would be a great load test for Spark's resiliency over >>>> long-running computations. >>>> >>>> -- >>>> Christopher T. Nguyen >>>> Co-founder & CEO, Adatao <http://adatao.com> >>>> linkedin.com/in/ctnguyen >>>> >>>> >>>> >>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui < >>>> [email protected]> wrote: >>>> >>>>> Hmm, I misread that you need a sliding window. >>>>> I am thinking out loud here: one way of dealing with this is to >>>>> improve NLineInputFormat so that partitions will have a small overlapping >>>>> portion in this case the overlapping portion is 50 columns >>>>> So let say the matrix is divided into overlapping partitions like this >>>>> [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can >>>>> assign each partition to a mapper to do mapPartition on it. >>>>> >>>>> >>>>> -------------------------------------------- >>>>> Michael (Bach) Bui, PhD, >>>>> Senior Staff Architect, ADATAO Inc. >>>>> www.adatao.com >>>>> >>>>> >>>>> >>>>> >>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]> >>>>> wrote: >>>>> >>>>> Here, Tom assumed that you have your big matrix already being loaded >>>>> in one machine. Now if you want to distribute it to slave nodes you will >>>>> need to broadcast it. I would expect this broadcasting will be done once >>>>> at >>>>> the beginning of your algorithm and the computation time will dominate the >>>>> overall execution time. >>>>> >>>>> On the other hand, a better way to deal with huge matrix is to store >>>>> the data in hdfs and load data into each slaves partition-by-partition. >>>>> This is fundamental data processing pattern in Spark/Hadoop world. >>>>> If you opt to do this, you will have to use suitable InputFormat to >>>>> make sure each partition has the right amount of row that you want. >>>>> For example if you are lucky each HDFS partition have exact n*50 rows, >>>>> then you can use rdd.mapPartition(func). Where func will take care of >>>>> splitting n*50-row partition into n sub matrix >>>>> >>>>> However, HDFS TextInput or SequnceInputFormat format will not >>>>> guarantee each partition has certain number of rows. What you want is >>>>> NLineInputFormat, which I think currently has not been pulled into Spark >>>>> yet. >>>>> If everyone think this is needed, I can implement it quickly, it >>>>> should be pretty easy. >>>>> >>>>> >>>>> -------------------------------------------- >>>>> Michael (Bach) Bui, PhD, >>>>> Senior Staff Architect, ADATAO Inc. >>>>> www.adatao.com >>>>> >>>>> >>>>> >>>>> >>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <[email protected]> >>>>> wrote: >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected]>wrote: >>>>> >>>>>> Oh, I see. I was thinking that there was a computational dependency >>>>>> on one window to the next. If the computations are independent, then I >>>>>> think Spark can help you out quite a bit. >>>>>> >>>>>> I think you would want an RDD where each element is a window of your >>>>>> dense matrix. I'm not aware of a way to distribute the windows of the >>>>>> big >>>>>> matrix in a way that doesn't involve broadcasting the whole thing. You >>>>>> might have to tweak some config options, but I think it would work >>>>>> straightaway. I would initialize the data structure like this: >>>>>> val matB = sc.broadcast(myBigDenseMatrix) >>>>>> val distributedChunks = sc.parallelize(0 until >>>>>> numWindows).mapPartitions(it => it.map(windowID => getWindow(matB.value, >>>>>> windowID) ) ) >>>>>> >>>>> >>>>> Here broadcast is used instead of calling parallelize on >>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does >>>>> sharing a big data mean a big network io overhead comparing to calling >>>>> parallelize, or is this overhead optimized due to the of partitioning? >>>>> >>>>> >>>>>> >>>>>> Then just apply your matrix ops as map on >>>>>> >>>>>> You maybe have your own tool for dense matrix ops, but I would >>>>>> suggest Scala Breeze. You'll have to use an old version of Breeze >>>>>> (current >>>>>> builds are for 2.10). Spark with Scala-2.10 is a little way off. >>>>>> >>>>>> >>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek >>>>>>> <[email protected]>wrote: >>>>>>> >>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of the >>>>>>>> matrix, you can index windows of the rows all you want, but you're >>>>>>>> limited >>>>>>>> to 100 concurrent tasks. You could use a column decomposition and >>>>>>>> access >>>>>>>> subsets of the columns with a PartitionPruningRDD. I have to say, >>>>>>>> though, >>>>>>>> if you're doing dense matrix operations, they will be 100s of times >>>>>>>> faster >>>>>>>> on a shared mem platform. This particular matrix, at 800 MB could be a >>>>>>>> Breeze on a single node. >>>>>>>> >>>>>>> >>>>>>> The computation for every submatrix is very expensive, it takes days >>>>>>> on a single node. I was hoping this can be reduced to hours or minutes >>>>>>> with >>>>>>> spark. >>>>>>> >>>>>>> Are you saying that spark is not suitable for this type of job? >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>> >> >
