OK, broadcasting is probably out. Let's assume that you have n nodes with k cores per node. Each node will process k windows, most of which will be in common (hopefully), so each node will need the range of columns in [node_id*k, (node_id+1)*k+50-1], and your data over-replication will be 50*n.
This is where my knowledge of Spark is running out. It is certainly syntactically possible to spin up threads inside of a map, but whether those will run in parallel (assuming there are available cores on the node)...I'm not really sure. It is simply a matter of configuration to create free cores on each node. Anyone? After that, you'd need to get the data into an RDD in the right form. I think Michael has the right idea, but the first question needs to be answered. You could also get really creative and put the matrix in a web server and have each task download the columns it needs. On Fri, Dec 20, 2013 at 3:07 PM, Aureliano Buendia <[email protected]>wrote: > Also over thinking is appreciated in this problem, as my production data > is actually near 100 x 1000,000,000 and data duplication could get messy > with this. > > Sorry about the initial misinformation, I was thinking about my > development/test data. > > > On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia > <[email protected]>wrote: > >> >> >> >> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]>wrote: >> >>> Totally agree. Even with a 50x data replication, that's only 40 GB, >>> which would be a fraction of standard cluster. But since overthinking is a >>> lot of fun, how about this: do a mapPartitions with a threaded subtask for >>> each window. Now you only need to replicate data across the boundaries of >>> each partition of windows, rather than each window. >>> >> >> How can this be written in spark scala? >> >> >>> >>> >>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen <[email protected]>wrote: >>> >>>> Are we over-thinking the problem here? Since the per-window compute >>>> task is hugely expensive, stateless from window to window, and the original >>>> big matrix is just 1GB, the primary gain in using a parallel engine is in >>>> distributing and scheduling these (long-running, isolated) tasks. I'm >>>> reading that data loading and distribution are going to be a tiny fraction >>>> of the overall compute time. >>>> >>>> If that's the case, it would make sense simply to start with a 1GB >>>> Array[Double] on the driver, from that create an RDD comprising 20,000 rows >>>> of 5,000 doubles each, map them out to the workers and have them interpret >>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each >>>> have a good fraction of several days to figure it out :) >>>> >>>> This would be a great load test for Spark's resiliency over >>>> long-running computations. >>>> >>>> -- >>>> Christopher T. Nguyen >>>> Co-founder & CEO, Adatao <http://adatao.com> >>>> linkedin.com/in/ctnguyen >>>> >>>> >>>> >>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui < >>>> [email protected]> wrote: >>>> >>>>> Hmm, I misread that you need a sliding window. >>>>> I am thinking out loud here: one way of dealing with this is to >>>>> improve NLineInputFormat so that partitions will have a small overlapping >>>>> portion in this case the overlapping portion is 50 columns >>>>> So let say the matrix is divided into overlapping partitions like this >>>>> [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can >>>>> assign each partition to a mapper to do mapPartition on it. >>>>> >>>>> >>>>> -------------------------------------------- >>>>> Michael (Bach) Bui, PhD, >>>>> Senior Staff Architect, ADATAO Inc. >>>>> www.adatao.com >>>>> >>>>> >>>>> >>>>> >>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]> >>>>> wrote: >>>>> >>>>> Here, Tom assumed that you have your big matrix already being loaded >>>>> in one machine. Now if you want to distribute it to slave nodes you will >>>>> need to broadcast it. I would expect this broadcasting will be done once >>>>> at >>>>> the beginning of your algorithm and the computation time will dominate the >>>>> overall execution time. >>>>> >>>>> On the other hand, a better way to deal with huge matrix is to store >>>>> the data in hdfs and load data into each slaves partition-by-partition. >>>>> This is fundamental data processing pattern in Spark/Hadoop world. >>>>> If you opt to do this, you will have to use suitable InputFormat to >>>>> make sure each partition has the right amount of row that you want. >>>>> For example if you are lucky each HDFS partition have exact n*50 rows, >>>>> then you can use rdd.mapPartition(func). Where func will take care of >>>>> splitting n*50-row partition into n sub matrix >>>>> >>>>> However, HDFS TextInput or SequnceInputFormat format will not >>>>> guarantee each partition has certain number of rows. What you want is >>>>> NLineInputFormat, which I think currently has not been pulled into Spark >>>>> yet. >>>>> If everyone think this is needed, I can implement it quickly, it >>>>> should be pretty easy. >>>>> >>>>> >>>>> -------------------------------------------- >>>>> Michael (Bach) Bui, PhD, >>>>> Senior Staff Architect, ADATAO Inc. >>>>> www.adatao.com >>>>> >>>>> >>>>> >>>>> >>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <[email protected]> >>>>> wrote: >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek <[email protected]>wrote: >>>>> >>>>>> Oh, I see. I was thinking that there was a computational dependency >>>>>> on one window to the next. If the computations are independent, then I >>>>>> think Spark can help you out quite a bit. >>>>>> >>>>>> I think you would want an RDD where each element is a window of your >>>>>> dense matrix. I'm not aware of a way to distribute the windows of the >>>>>> big >>>>>> matrix in a way that doesn't involve broadcasting the whole thing. You >>>>>> might have to tweak some config options, but I think it would work >>>>>> straightaway. I would initialize the data structure like this: >>>>>> val matB = sc.broadcast(myBigDenseMatrix) >>>>>> val distributedChunks = sc.parallelize(0 until >>>>>> numWindows).mapPartitions(it => it.map(windowID => getWindow(matB.value, >>>>>> windowID) ) ) >>>>>> >>>>> >>>>> Here broadcast is used instead of calling parallelize on >>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does >>>>> sharing a big data mean a big network io overhead comparing to calling >>>>> parallelize, or is this overhead optimized due to the of partitioning? >>>>> >>>>> >>>>>> >>>>>> Then just apply your matrix ops as map on >>>>>> >>>>>> You maybe have your own tool for dense matrix ops, but I would >>>>>> suggest Scala Breeze. You'll have to use an old version of Breeze >>>>>> (current >>>>>> builds are for 2.10). Spark with Scala-2.10 is a little way off. >>>>>> >>>>>> >>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek >>>>>>> <[email protected]>wrote: >>>>>>> >>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of the >>>>>>>> matrix, you can index windows of the rows all you want, but you're >>>>>>>> limited >>>>>>>> to 100 concurrent tasks. You could use a column decomposition and >>>>>>>> access >>>>>>>> subsets of the columns with a PartitionPruningRDD. I have to say, >>>>>>>> though, >>>>>>>> if you're doing dense matrix operations, they will be 100s of times >>>>>>>> faster >>>>>>>> on a shared mem platform. This particular matrix, at 800 MB could be a >>>>>>>> Breeze on a single node. >>>>>>>> >>>>>>> >>>>>>> The computation for every submatrix is very expensive, it takes days >>>>>>> on a single node. I was hoping this can be reduced to hours or minutes >>>>>>> with >>>>>>> spark. >>>>>>> >>>>>>> Are you saying that spark is not suitable for this type of job? >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>> >> >
