I believe the best way would be to use reduceByKey operation. On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar < jku...@rocketfuelinc.com.invalid> wrote:
> You will need to do a collect and update a global map if you want to. > > myDStream.map(record => (record._2, (record._3, record_4, record._5)) > .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + > r2._3)) > .foreachRDD(rdd => { > rdd.collect().foreach((fileName, valueTuple) => <update global > map here>) > }) > > -- > Thanks > Jatin Kumar | Rocket Scientist > +91-7696741743 m > > On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.u...@gmail.com> > wrote: > >> Nevermind, seems like an executor level mutable map is not recommended as >> stated in >> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/ >> >> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com> >> wrote: >> >>> Thanks for your reply Jatin. I changed my parsing logic to what you >>> suggested: >>> >>> def parseCoverageLine(str: String) = { >>> val arr = str.split(",") >>> ... >>> ... >>> (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) >>> } >>> >>> Then in the grouping, can i use a global hash map per executor / >>> partition to aggregate the results? >>> >>> val globalMap:[String: List[Int]] = Map() >>> val coverageDStream = inputStream.map(parseCoverageLine) >>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { >>> // if exists in global map, append result else add new key >>> >>> // globalMap >>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } >>> }) >>> >>> Thanks, >>> Vinti >>> >>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com> >>> wrote: >>> >>>> Hello Vinti, >>>> >>>> One way to get this done is you split your input line into key and >>>> value tuple and then you can simply use groupByKey and handle the values >>>> the way you want. For example: >>>> >>>> Assuming you have already split the values into a 5 tuple: >>>> myDStream.map(record => (record._2, (record._3, record_4, record._5)) >>>> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + >>>> r2._3)) >>>> >>>> I hope that helps. >>>> >>>> -- >>>> Thanks >>>> Jatin Kumar | Rocket Scientist >>>> +91-7696741743 m >>>> >>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari < >>>> vinti.u...@gmail.com> wrote: >>>> >>>>> Hello, >>>>> >>>>> I have input lines like below >>>>> >>>>> *Input* >>>>> t1, file1, 1, 1, 1 >>>>> t1, file1, 1, 2, 3 >>>>> t1, file2, 2, 2, 2, 2 >>>>> t2, file1, 5, 5, 5 >>>>> t2, file2, 1, 1, 2, 2 >>>>> >>>>> and i want to achieve the output like below rows which is a vertical >>>>> addition of the corresponding numbers. >>>>> >>>>> *Output* >>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] >>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ] >>>>> >>>>> I am in a spark streaming context and i am having a hard time trying >>>>> to figure out the way to group by file name. >>>>> >>>>> It seems like i will need to use something like below, i am not sure >>>>> how to get to the correct syntax. Any inputs will be helpful. >>>>> >>>>> myDStream.foreachRDD(rdd => rdd.groupBy()) >>>>> >>>>> I know how to do the vertical sum of array of given numbers, but i am >>>>> not sure how to feed that function to the group by. >>>>> >>>>> def compute_counters(counts : ArrayBuffer[List[Int]]) = { >>>>> counts.toList.transpose.map(_.sum) >>>>> } >>>>> >>>>> ~Thanks, >>>>> Vinti >>>>> >>>> >>>> >>> >> > -- Best Regards, Ayan Guha