On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen <[email protected]> wrote:
> Aureliano, how would your production data be coming in and accessed? It's > possible that you can still think of that level as a serial operation > (outer loop, large chunks) first before worrying about parallelizing the > computation of the tiny chunks. > It's a batch processing of time series data. Perhaps a serial processing where each serial item is a set of parallel processes could be an option. Does spark have such option? > > And I'm reading that when you refer to "data duplication", you're worried > about that as a side-effect problem, not as a requirement, correct? > That's right. Data duplication is certainly not a requirement, we are not trying to avoid it, but if it's a side effect that leads to some considerable io overhead, it's not going to be good. > And if the former, I don't see that data duplication is a necessary side > effect. Unless I missed something in the thread, don't use broadcast. > I take it that overlapped partitions does not mean data duplication. I wasn't sure if partitions hold a copy, or a reference. > > Put another way, I see the scale of this challenge as far more operational > than logical (when squinted at from the right angle :) > > -- > Christopher T. Nguyen > Co-founder & CEO, Adatao <http://adatao.com> > linkedin.com/in/ctnguyen > > > > On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia > <[email protected]>wrote: > >> Also over thinking is appreciated in this problem, as my production data >> is actually near 100 x 1000,000,000 and data duplication could get messy >> with this. >> >> Sorry about the initial misinformation, I was thinking about my >> development/test data. >> >> >> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia >> <[email protected]>wrote: >> >>> >>> >>> >>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]>wrote: >>> >>>> Totally agree. Even with a 50x data replication, that's only 40 GB, >>>> which would be a fraction of standard cluster. But since overthinking is a >>>> lot of fun, how about this: do a mapPartitions with a threaded subtask for >>>> each window. Now you only need to replicate data across the boundaries of >>>> each partition of windows, rather than each window. >>>> >>> >>> How can this be written in spark scala? >>> >>> >>>> >>>> >>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen <[email protected]>wrote: >>>> >>>>> Are we over-thinking the problem here? Since the per-window compute >>>>> task is hugely expensive, stateless from window to window, and the >>>>> original >>>>> big matrix is just 1GB, the primary gain in using a parallel engine is in >>>>> distributing and scheduling these (long-running, isolated) tasks. I'm >>>>> reading that data loading and distribution are going to be a tiny fraction >>>>> of the overall compute time. >>>>> >>>>> If that's the case, it would make sense simply to start with a 1GB >>>>> Array[Double] on the driver, from that create an RDD comprising 20,000 >>>>> rows >>>>> of 5,000 doubles each, map them out to the workers and have them interpret >>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each >>>>> have a good fraction of several days to figure it out :) >>>>> >>>>> This would be a great load test for Spark's resiliency over >>>>> long-running computations. >>>>> >>>>> -- >>>>> Christopher T. Nguyen >>>>> Co-founder & CEO, Adatao <http://adatao.com> >>>>> linkedin.com/in/ctnguyen >>>>> >>>>> >>>>> >>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui < >>>>> [email protected]> wrote: >>>>> >>>>>> Hmm, I misread that you need a sliding window. >>>>>> I am thinking out loud here: one way of dealing with this is to >>>>>> improve NLineInputFormat so that partitions will have a small overlapping >>>>>> portion in this case the overlapping portion is 50 columns >>>>>> So let say the matrix is divided into overlapping partitions like >>>>>> this [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then >>>>>> we >>>>>> can assign each partition to a mapper to do mapPartition on it. >>>>>> >>>>>> >>>>>> -------------------------------------------- >>>>>> Michael (Bach) Bui, PhD, >>>>>> Senior Staff Architect, ADATAO Inc. >>>>>> www.adatao.com >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]> >>>>>> wrote: >>>>>> >>>>>> Here, Tom assumed that you have your big matrix already being loaded >>>>>> in one machine. Now if you want to distribute it to slave nodes you will >>>>>> need to broadcast it. I would expect this broadcasting will be done once >>>>>> at >>>>>> the beginning of your algorithm and the computation time will dominate >>>>>> the >>>>>> overall execution time. >>>>>> >>>>>> On the other hand, a better way to deal with huge matrix is to store >>>>>> the data in hdfs and load data into each slaves partition-by-partition. >>>>>> This is fundamental data processing pattern in Spark/Hadoop world. >>>>>> If you opt to do this, you will have to use suitable InputFormat to >>>>>> make sure each partition has the right amount of row that you want. >>>>>> For example if you are lucky each HDFS partition have exact n*50 >>>>>> rows, then you can use rdd.mapPartition(func). Where func will take care >>>>>> of >>>>>> splitting n*50-row partition into n sub matrix >>>>>> >>>>>> However, HDFS TextInput or SequnceInputFormat format will not >>>>>> guarantee each partition has certain number of rows. What you want is >>>>>> NLineInputFormat, which I think currently has not been pulled into Spark >>>>>> yet. >>>>>> If everyone think this is needed, I can implement it quickly, it >>>>>> should be pretty easy. >>>>>> >>>>>> >>>>>> -------------------------------------------- >>>>>> Michael (Bach) Bui, PhD, >>>>>> Senior Staff Architect, ADATAO Inc. >>>>>> www.adatao.com >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <[email protected]> >>>>>> wrote: >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected]>wrote: >>>>>> >>>>>>> Oh, I see. I was thinking that there was a computational dependency >>>>>>> on one window to the next. If the computations are independent, then I >>>>>>> think Spark can help you out quite a bit. >>>>>>> >>>>>>> I think you would want an RDD where each element is a window of your >>>>>>> dense matrix. I'm not aware of a way to distribute the windows of the >>>>>>> big >>>>>>> matrix in a way that doesn't involve broadcasting the whole thing. You >>>>>>> might have to tweak some config options, but I think it would work >>>>>>> straightaway. I would initialize the data structure like this: >>>>>>> val matB = sc.broadcast(myBigDenseMatrix) >>>>>>> val distributedChunks = sc.parallelize(0 until >>>>>>> numWindows).mapPartitions(it => it.map(windowID => getWindow(matB.value, >>>>>>> windowID) ) ) >>>>>>> >>>>>> >>>>>> Here broadcast is used instead of calling parallelize on >>>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does >>>>>> sharing a big data mean a big network io overhead comparing to calling >>>>>> parallelize, or is this overhead optimized due to the of partitioning? >>>>>> >>>>>> >>>>>>> >>>>>>> Then just apply your matrix ops as map on >>>>>>> >>>>>>> You maybe have your own tool for dense matrix ops, but I would >>>>>>> suggest Scala Breeze. You'll have to use an old version of Breeze >>>>>>> (current >>>>>>> builds are for 2.10). Spark with Scala-2.10 is a little way off. >>>>>>> >>>>>>> >>>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek >>>>>>>> <[email protected]>wrote: >>>>>>>> >>>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of the >>>>>>>>> matrix, you can index windows of the rows all you want, but you're >>>>>>>>> limited >>>>>>>>> to 100 concurrent tasks. You could use a column decomposition and >>>>>>>>> access >>>>>>>>> subsets of the columns with a PartitionPruningRDD. I have to say, >>>>>>>>> though, >>>>>>>>> if you're doing dense matrix operations, they will be 100s of times >>>>>>>>> faster >>>>>>>>> on a shared mem platform. This particular matrix, at 800 MB could be >>>>>>>>> a >>>>>>>>> Breeze on a single node. >>>>>>>>> >>>>>>>> >>>>>>>> The computation for every submatrix is very expensive, it takes >>>>>>>> days on a single node. I was hoping this can be reduced to hours or >>>>>>>> minutes >>>>>>>> with spark. >>>>>>>> >>>>>>>> Are you saying that spark is not suitable for this type of job? >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >
