I believe the best way would be to use reduceByKey operation.

On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> You will need to do a collect and update a global map if you want to.
>
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
>          .foreachRDD(rdd => {
>            rdd.collect().foreach((fileName, valueTuple) => <update global
> map here>)
>          })
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Nevermind, seems like an executor level mutable map is not recommended as
>> stated in
>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>
>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>> suggested:
>>>
>>>     def parseCoverageLine(str: String) = {
>>>       val arr = str.split(",")
>>>       ...
>>>       ...
>>>       (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>>     }
>>>
>>> Then in the grouping, can i use a global hash map per executor /
>>> partition to aggregate the results?
>>>
>>> val globalMap:[String: List[Int]] = Map()
>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>>     coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>>     // if exists in global map, append result else add new key
>>>
>>>     // globalMap
>>>     // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>> })
>>>
>>> Thanks,
>>> Vinti
>>>
>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com>
>>> wrote:
>>>
>>>> Hello Vinti,
>>>>
>>>> One way to get this done is you split your input line into key and
>>>> value tuple and then you can simply use groupByKey and handle the values
>>>> the way you want. For example:
>>>>
>>>> Assuming you have already split the values into a 5 tuple:
>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>>> r2._3))
>>>>
>>>> I hope that helps.
>>>>
>>>> --
>>>> Thanks
>>>> Jatin Kumar | Rocket Scientist
>>>> +91-7696741743 m
>>>>
>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>> vinti.u...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have input lines like below
>>>>>
>>>>> *Input*
>>>>> t1, file1, 1, 1, 1
>>>>> t1, file1, 1, 2, 3
>>>>> t1, file2, 2, 2, 2, 2
>>>>> t2, file1, 5, 5, 5
>>>>> t2, file2, 1, 1, 2, 2
>>>>>
>>>>> and i want to achieve the output like below rows which is a vertical
>>>>> addition of the corresponding numbers.
>>>>>
>>>>> *Output*
>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>
>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>> to figure out the way to group by file name.
>>>>>
>>>>> It seems like i will need to use something like below, i am not sure
>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>
>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>
>>>>> I know how to do the vertical sum of array of given numbers, but i am
>>>>> not sure how to feed that function to the group by.
>>>>>
>>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>>       counts.toList.transpose.map(_.sum)
>>>>>   }
>>>>>
>>>>> ~Thanks,
>>>>> Vinti
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha

Reply via email to