Also over thinking is appreciated in this problem, as my production data is actually near 100 x 1000,000,000 and data duplication could get messy with this.
Sorry about the initial misinformation, I was thinking about my development/test data. On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia <[email protected]>wrote: > > > > On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]> wrote: > >> Totally agree. Even with a 50x data replication, that's only 40 GB, >> which would be a fraction of standard cluster. But since overthinking is a >> lot of fun, how about this: do a mapPartitions with a threaded subtask for >> each window. Now you only need to replicate data across the boundaries of >> each partition of windows, rather than each window. >> > > How can this be written in spark scala? > > >> >> >> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen <[email protected]>wrote: >> >>> Are we over-thinking the problem here? Since the per-window compute task >>> is hugely expensive, stateless from window to window, and the original big >>> matrix is just 1GB, the primary gain in using a parallel engine is in >>> distributing and scheduling these (long-running, isolated) tasks. I'm >>> reading that data loading and distribution are going to be a tiny fraction >>> of the overall compute time. >>> >>> If that's the case, it would make sense simply to start with a 1GB >>> Array[Double] on the driver, from that create an RDD comprising 20,000 rows >>> of 5,000 doubles each, map them out to the workers and have them interpret >>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each >>> have a good fraction of several days to figure it out :) >>> >>> This would be a great load test for Spark's resiliency over long-running >>> computations. >>> >>> -- >>> Christopher T. Nguyen >>> Co-founder & CEO, Adatao <http://adatao.com> >>> linkedin.com/in/ctnguyen >>> >>> >>> >>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui <[email protected] >>> > wrote: >>> >>>> Hmm, I misread that you need a sliding window. >>>> I am thinking out loud here: one way of dealing with this is to improve >>>> NLineInputFormat so that partitions will have a small overlapping portion >>>> in this case the overlapping portion is 50 columns >>>> So let say the matrix is divided into overlapping partitions like this >>>> [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can >>>> assign each partition to a mapper to do mapPartition on it. >>>> >>>> >>>> -------------------------------------------- >>>> Michael (Bach) Bui, PhD, >>>> Senior Staff Architect, ADATAO Inc. >>>> www.adatao.com >>>> >>>> >>>> >>>> >>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]> >>>> wrote: >>>> >>>> Here, Tom assumed that you have your big matrix already being loaded in >>>> one machine. Now if you want to distribute it to slave nodes you will need >>>> to broadcast it. I would expect this broadcasting will be done once at the >>>> beginning of your algorithm and the computation time will dominate the >>>> overall execution time. >>>> >>>> On the other hand, a better way to deal with huge matrix is to store >>>> the data in hdfs and load data into each slaves partition-by-partition. >>>> This is fundamental data processing pattern in Spark/Hadoop world. >>>> If you opt to do this, you will have to use suitable InputFormat to >>>> make sure each partition has the right amount of row that you want. >>>> For example if you are lucky each HDFS partition have exact n*50 rows, >>>> then you can use rdd.mapPartition(func). Where func will take care of >>>> splitting n*50-row partition into n sub matrix >>>> >>>> However, HDFS TextInput or SequnceInputFormat format will not guarantee >>>> each partition has certain number of rows. What you want is >>>> NLineInputFormat, which I think currently has not been pulled into Spark >>>> yet. >>>> If everyone think this is needed, I can implement it quickly, it should >>>> be pretty easy. >>>> >>>> >>>> -------------------------------------------- >>>> Michael (Bach) Bui, PhD, >>>> Senior Staff Architect, ADATAO Inc. >>>> www.adatao.com >>>> >>>> >>>> >>>> >>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <[email protected]> >>>> wrote: >>>> >>>> >>>> >>>> >>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected]>wrote: >>>> >>>>> Oh, I see. I was thinking that there was a computational dependency >>>>> on one window to the next. If the computations are independent, then I >>>>> think Spark can help you out quite a bit. >>>>> >>>>> I think you would want an RDD where each element is a window of your >>>>> dense matrix. I'm not aware of a way to distribute the windows of the big >>>>> matrix in a way that doesn't involve broadcasting the whole thing. You >>>>> might have to tweak some config options, but I think it would work >>>>> straightaway. I would initialize the data structure like this: >>>>> val matB = sc.broadcast(myBigDenseMatrix) >>>>> val distributedChunks = sc.parallelize(0 until >>>>> numWindows).mapPartitions(it => it.map(windowID => getWindow(matB.value, >>>>> windowID) ) ) >>>>> >>>> >>>> Here broadcast is used instead of calling parallelize on >>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does >>>> sharing a big data mean a big network io overhead comparing to calling >>>> parallelize, or is this overhead optimized due to the of partitioning? >>>> >>>> >>>>> >>>>> Then just apply your matrix ops as map on >>>>> >>>>> You maybe have your own tool for dense matrix ops, but I would suggest >>>>> Scala Breeze. You'll have to use an old version of Breeze (current builds >>>>> are for 2.10). Spark with Scala-2.10 is a little way off. >>>>> >>>>> >>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek <[email protected]>wrote: >>>>>> >>>>>>> If you use an RDD[Array[Double]] with a row decomposition of the >>>>>>> matrix, you can index windows of the rows all you want, but you're >>>>>>> limited >>>>>>> to 100 concurrent tasks. You could use a column decomposition and >>>>>>> access >>>>>>> subsets of the columns with a PartitionPruningRDD. I have to say, >>>>>>> though, >>>>>>> if you're doing dense matrix operations, they will be 100s of times >>>>>>> faster >>>>>>> on a shared mem platform. This particular matrix, at 800 MB could be a >>>>>>> Breeze on a single node. >>>>>>> >>>>>> >>>>>> The computation for every submatrix is very expensive, it takes days >>>>>> on a single node. I was hoping this can be reduced to hours or minutes >>>>>> with >>>>>> spark. >>>>>> >>>>>> Are you saying that spark is not suitable for this type of job? >>>>>> >>>>> >>>>> >>>> >>>> >>>> >>> >> >
