You will need to do a collect and update a global map if you want to. myDStream.map(record => (record._2, (record._3, record_4, record._5)) .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + r2._3)) .foreachRDD(rdd => { rdd.collect().foreach((fileName, valueTuple) => <update global map here>) })
-- Thanks Jatin Kumar | Rocket Scientist +91-7696741743 m On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.u...@gmail.com> wrote: > Nevermind, seems like an executor level mutable map is not recommended as > stated in > http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/ > > On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com> > wrote: > >> Thanks for your reply Jatin. I changed my parsing logic to what you >> suggested: >> >> def parseCoverageLine(str: String) = { >> val arr = str.split(",") >> ... >> ... >> (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2]) >> } >> >> Then in the grouping, can i use a global hash map per executor / >> partition to aggregate the results? >> >> val globalMap:[String: List[Int]] = Map() >> val coverageDStream = inputStream.map(parseCoverageLine) >> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => { >> // if exists in global map, append result else add new key >> >> // globalMap >> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] } >> }) >> >> Thanks, >> Vinti >> >> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com> >> wrote: >> >>> Hello Vinti, >>> >>> One way to get this done is you split your input line into key and value >>> tuple and then you can simply use groupByKey and handle the values the way >>> you want. For example: >>> >>> Assuming you have already split the values into a 5 tuple: >>> myDStream.map(record => (record._2, (record._3, record_4, record._5)) >>> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + >>> r2._3)) >>> >>> I hope that helps. >>> >>> -- >>> Thanks >>> Jatin Kumar | Rocket Scientist >>> +91-7696741743 m >>> >>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vinti.u...@gmail.com >>> > wrote: >>> >>>> Hello, >>>> >>>> I have input lines like below >>>> >>>> *Input* >>>> t1, file1, 1, 1, 1 >>>> t1, file1, 1, 2, 3 >>>> t1, file2, 2, 2, 2, 2 >>>> t2, file1, 5, 5, 5 >>>> t2, file2, 1, 1, 2, 2 >>>> >>>> and i want to achieve the output like below rows which is a vertical >>>> addition of the corresponding numbers. >>>> >>>> *Output* >>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ] >>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ] >>>> >>>> I am in a spark streaming context and i am having a hard time trying to >>>> figure out the way to group by file name. >>>> >>>> It seems like i will need to use something like below, i am not sure >>>> how to get to the correct syntax. Any inputs will be helpful. >>>> >>>> myDStream.foreachRDD(rdd => rdd.groupBy()) >>>> >>>> I know how to do the vertical sum of array of given numbers, but i am >>>> not sure how to feed that function to the group by. >>>> >>>> def compute_counters(counts : ArrayBuffer[List[Int]]) = { >>>> counts.toList.transpose.map(_.sum) >>>> } >>>> >>>> ~Thanks, >>>> Vinti >>>> >>> >>> >> >