You will need to do a collect and update a global map if you want to.

myDStream.map(record => (record._2, (record._3, record_4, record._5))
         .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
r2._3))
         .foreachRDD(rdd => {
           rdd.collect().foreach((fileName, valueTuple) => <update global
map here>)
         })

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Nevermind, seems like an executor level mutable map is not recommended as
> stated in
> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>
> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Thanks for your reply Jatin. I changed my parsing logic to what you
>> suggested:
>>
>>     def parseCoverageLine(str: String) = {
>>       val arr = str.split(",")
>>       ...
>>       ...
>>       (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>     }
>>
>> Then in the grouping, can i use a global hash map per executor /
>> partition to aggregate the results?
>>
>> val globalMap:[String: List[Int]] = Map()
>> val coverageDStream = inputStream.map(parseCoverageLine)
>>     coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>     // if exists in global map, append result else add new key
>>
>>     // globalMap
>>     // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>> })
>>
>> Thanks,
>> Vinti
>>
>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com>
>> wrote:
>>
>>> Hello Vinti,
>>>
>>> One way to get this done is you split your input line into key and value
>>> tuple and then you can simply use groupByKey and handle the values the way
>>> you want. For example:
>>>
>>> Assuming you have already split the values into a 5 tuple:
>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>> r2._3))
>>>
>>> I hope that helps.
>>>
>>> --
>>> Thanks
>>> Jatin Kumar | Rocket Scientist
>>> +91-7696741743 m
>>>
>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vinti.u...@gmail.com
>>> > wrote:
>>>
>>>> Hello,
>>>>
>>>> I have input lines like below
>>>>
>>>> *Input*
>>>> t1, file1, 1, 1, 1
>>>> t1, file1, 1, 2, 3
>>>> t1, file2, 2, 2, 2, 2
>>>> t2, file1, 5, 5, 5
>>>> t2, file2, 1, 1, 2, 2
>>>>
>>>> and i want to achieve the output like below rows which is a vertical
>>>> addition of the corresponding numbers.
>>>>
>>>> *Output*
>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>
>>>> I am in a spark streaming context and i am having a hard time trying to
>>>> figure out the way to group by file name.
>>>>
>>>> It seems like i will need to use something like below, i am not sure
>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>
>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>
>>>> I know how to do the vertical sum of array of given numbers, but i am
>>>> not sure how to feed that function to the group by.
>>>>
>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>       counts.toList.transpose.map(_.sum)
>>>>   }
>>>>
>>>> ~Thanks,
>>>> Vinti
>>>>
>>>
>>>
>>
>

Reply via email to