Aureliano, would something like this work? The red code is the only place
where you have to think parallel here.


while (thereIsStillDataToWorkOn) {
  bigChunk: Array[Double] = readInTheNextNx100x50MatrixData() // N is a
design variable
  bigChunkAsArraysOf5000Doubles: Array[Array[Double]] =
restructureIntoArraysOf5000Doubles(bigChunk)
  myRDD = sc
    .parallelize(bigChunkAsArraysOf5000Doubles, 1)
    .map(eachArrayOf5000Doubles =>
someVeryLongRunningTransformer(eachArrayOf5000Doubles))
    .collect()
}


Next, pardon me if this is too basic, but in case it is helpful: this code
first runs on a single machine, called a Driver, which must have access to
the source data. When we call parallelize(), Spark handles all the
partitioning of the data into the available Workers, including serializing
each data partition to the Workers, and collecting the results back in one
place. There is no data duplication other than the Worker's copy of the
data from the Driver.

This indeed does not take advantage of all of the other available Spark
goodnesses that others have correctly pointed out on this thread, such as
broadcasting, mapPartitions() vs map(), parallel data loading across HDFS
partitions, etc. But it would be exactly the right thing to do if it best
fits your problem statement.
--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Fri, Dec 20, 2013 at 2:01 PM, Aureliano Buendia <[email protected]>wrote:

>
>
>
> On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen <[email protected]>wrote:
>
>> Aureliano, how would your production data be coming in and accessed? It's
>> possible that you can still think of that level as a serial operation
>> (outer loop, large chunks) first before worrying about parallelizing the
>> computation of the tiny chunks.
>>
>
> It's a batch processing of time series data. Perhaps a serial processing
> where each serial item is a set of parallel processes could be an option.
> Does spark have such option?
>
>
>>
>> And I'm reading that when you refer to "data duplication", you're worried
>> about that as a side-effect problem, not as a requirement, correct?
>>
>
> That's right. Data duplication is certainly not a requirement, we are not
> trying to avoid it, but if it's a side effect that leads to some
> considerable io overhead, it's not going to be good.
>
>
>> And if the former, I don't see that data duplication is a necessary side
>> effect. Unless I missed something in the thread, don't use broadcast.
>>
>
> I take it that overlapped partitions does not mean data duplication. I
> wasn't sure if partitions hold a copy, or a reference.
>
>
>>
>> Put another way, I see the scale of this challenge as far more
>> operational than logical (when squinted at from the right angle :)
>>
>>  --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia 
>> <[email protected]>wrote:
>>
>>> Also over thinking is appreciated in this problem, as my production data
>>> is actually near 100 x 1000,000,000 and data duplication could get messy
>>> with this.
>>>
>>> Sorry about the initial misinformation, I was thinking about my
>>> development/test data.
>>>
>>>
>>> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia <[email protected]
>>> > wrote:
>>>
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek <[email protected]>wrote:
>>>>
>>>>> Totally agree.  Even with a 50x data replication, that's only 40 GB,
>>>>> which would be a fraction of standard cluster.  But since overthinking is 
>>>>> a
>>>>> lot of fun, how about this: do a mapPartitions with a threaded subtask for
>>>>> each window.  Now you only need to replicate data across the boundaries of
>>>>> each partition of windows, rather than each window.
>>>>>
>>>>
>>>> How can this be written in spark scala?
>>>>
>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen 
>>>>> <[email protected]>wrote:
>>>>>
>>>>>> Are we over-thinking the problem here? Since the per-window compute
>>>>>> task is hugely expensive, stateless from window to window, and the 
>>>>>> original
>>>>>> big matrix is just 1GB, the primary gain in using a parallel engine is in
>>>>>> distributing and scheduling these (long-running, isolated) tasks. I'm
>>>>>> reading that data loading and distribution are going to be a tiny 
>>>>>> fraction
>>>>>> of the overall compute time.
>>>>>>
>>>>>> If that's the case, it would make sense simply to start with a 1GB
>>>>>> Array[Double] on the driver, from that create an RDD comprising 20,000 
>>>>>> rows
>>>>>> of 5,000 doubles each, map them out to the workers and have them 
>>>>>> interpret
>>>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They 
>>>>>> each
>>>>>> have a good fraction of several days to figure it out :)
>>>>>>
>>>>>> This would be a great load test for Spark's resiliency over
>>>>>> long-running computations.
>>>>>>
>>>>>> --
>>>>>> Christopher T. Nguyen
>>>>>> Co-founder & CEO, Adatao <http://adatao.com>
>>>>>> linkedin.com/in/ctnguyen
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hmm, I misread that you need a sliding window.
>>>>>>> I am thinking out loud here: one way of dealing with this is to
>>>>>>> improve NLineInputFormat so that partitions will have a small 
>>>>>>> overlapping
>>>>>>> portion in this case the overlapping portion is 50 columns
>>>>>>> So let say the matrix is divided into overlapping partitions like
>>>>>>> this [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then 
>>>>>>> we
>>>>>>> can assign each partition to a mapper to do mapPartition on it.
>>>>>>>
>>>>>>>
>>>>>>> --------------------------------------------
>>>>>>> Michael (Bach) Bui, PhD,
>>>>>>> Senior Staff Architect, ADATAO Inc.
>>>>>>> www.adatao.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Here, Tom assumed that you have your big matrix already being loaded
>>>>>>> in one machine. Now if you want to distribute it to slave nodes you will
>>>>>>> need to broadcast it. I would expect this broadcasting will be done 
>>>>>>> once at
>>>>>>> the beginning of your algorithm and the computation time will dominate 
>>>>>>> the
>>>>>>> overall execution time.
>>>>>>>
>>>>>>> On the other hand, a better way to deal with huge matrix is to store
>>>>>>> the data in hdfs and load data into each slaves partition-by-partition.
>>>>>>> This is fundamental data processing pattern in Spark/Hadoop world.
>>>>>>> If you opt to do this, you will have to use suitable InputFormat to
>>>>>>> make sure each partition has the right amount of row that you want.
>>>>>>> For example if you are lucky each HDFS partition have exact n*50
>>>>>>> rows, then you can use rdd.mapPartition(func). Where func will take 
>>>>>>> care of
>>>>>>> splitting n*50-row partition into n sub matrix
>>>>>>>
>>>>>>> However, HDFS TextInput or SequnceInputFormat format will not
>>>>>>> guarantee each partition has certain number of rows. What you want is
>>>>>>> NLineInputFormat, which I think currently has not been pulled into Spark
>>>>>>> yet.
>>>>>>> If everyone think this is needed, I can implement it quickly, it
>>>>>>> should be pretty easy.
>>>>>>>
>>>>>>>
>>>>>>> --------------------------------------------
>>>>>>> Michael (Bach) Bui, PhD,
>>>>>>> Senior Staff Architect, ADATAO Inc.
>>>>>>> www.adatao.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek 
>>>>>>> <[email protected]>wrote:
>>>>>>>
>>>>>>>> Oh, I see.  I was thinking that there was a computational
>>>>>>>> dependency on one window to the next.  If the computations are 
>>>>>>>> independent,
>>>>>>>> then I think Spark can help you out quite a bit.
>>>>>>>>
>>>>>>>> I think you would want an RDD where each element is a window of
>>>>>>>> your dense matrix.  I'm not aware of a way to distribute the windows 
>>>>>>>> of the
>>>>>>>> big matrix in a way that doesn't involve broadcasting the whole thing. 
>>>>>>>>  You
>>>>>>>> might have to tweak some config options, but I think it would work
>>>>>>>> straightaway.  I would initialize the data structure like this:
>>>>>>>> val matB = sc.broadcast(myBigDenseMatrix)
>>>>>>>> val distributedChunks = sc.parallelize(0 until
>>>>>>>> numWindows).mapPartitions(it => it.map(windowID => 
>>>>>>>> getWindow(matB.value,
>>>>>>>> windowID) ) )
>>>>>>>>
>>>>>>>
>>>>>>> Here broadcast is used instead of calling parallelize on
>>>>>>> myBigDenseMatrix. Is it okay to broadcast a huge amount of data? Does
>>>>>>> sharing a big data mean a big network io overhead comparing to calling
>>>>>>> parallelize, or is this overhead optimized due to the of partitioning?
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> Then just apply your matrix ops as map on
>>>>>>>>
>>>>>>>> You maybe have your own tool for dense matrix ops, but I would
>>>>>>>> suggest Scala Breeze.  You'll have to use an old version of Breeze 
>>>>>>>> (current
>>>>>>>> builds are for 2.10).  Spark with Scala-2.10 is a little way off.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek <[email protected]
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> If you use an RDD[Array[Double]] with a row decomposition of the
>>>>>>>>>> matrix, you can index windows of the rows all you want, but you're 
>>>>>>>>>> limited
>>>>>>>>>> to 100 concurrent tasks.  You could use a column decomposition and 
>>>>>>>>>> access
>>>>>>>>>> subsets of the columns with a PartitionPruningRDD.  I have to say, 
>>>>>>>>>> though,
>>>>>>>>>> if you're doing dense matrix operations, they will be 100s of times 
>>>>>>>>>> faster
>>>>>>>>>> on a shared mem platform.  This particular matrix, at 800 MB could 
>>>>>>>>>> be a
>>>>>>>>>> Breeze on a single node.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The computation for every submatrix is very expensive, it takes
>>>>>>>>> days on a single node. I was hoping this can be reduced to hours or 
>>>>>>>>> minutes
>>>>>>>>> with spark.
>>>>>>>>>
>>>>>>>>> Are you saying that spark is not suitable for this type of job?
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to