I believe the best way would be to use reduceByKey operation.
On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
[email protected]> wrote:
> You will need to do a collect and update a global map if you want to.
>
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
> .foreachRDD(rdd => {
> rdd.collect().foreach((fileName, valueTuple) => <update global
> map here>)
> })
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <[email protected]>
> wrote:
>
>> Nevermind, seems like an executor level mutable map is not recommended as
>> stated in
>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>
>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <[email protected]>
>> wrote:
>>
>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>> suggested:
>>>
>>> def parseCoverageLine(str: String) = {
>>> val arr = str.split(",")
>>> ...
>>> ...
>>> (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2])
>>> }
>>>
>>> Then in the grouping, can i use a global hash map per executor /
>>> partition to aggregate the results?
>>>
>>> val globalMap:[String: List[Int]] = Map()
>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>> // if exists in global map, append result else add new key
>>>
>>> // globalMap
>>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>> })
>>>
>>> Thanks,
>>> Vinti
>>>
>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <[email protected]>
>>> wrote:
>>>
>>>> Hello Vinti,
>>>>
>>>> One way to get this done is you split your input line into key and
>>>> value tuple and then you can simply use groupByKey and handle the values
>>>> the way you want. For example:
>>>>
>>>> Assuming you have already split the values into a 5 tuple:
>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>>> r2._3))
>>>>
>>>> I hope that helps.
>>>>
>>>> --
>>>> Thanks
>>>> Jatin Kumar | Rocket Scientist
>>>> +91-7696741743 m
>>>>
>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>> [email protected]> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have input lines like below
>>>>>
>>>>> *Input*
>>>>> t1, file1, 1, 1, 1
>>>>> t1, file1, 1, 2, 3
>>>>> t1, file2, 2, 2, 2, 2
>>>>> t2, file1, 5, 5, 5
>>>>> t2, file2, 1, 1, 2, 2
>>>>>
>>>>> and i want to achieve the output like below rows which is a vertical
>>>>> addition of the corresponding numbers.
>>>>>
>>>>> *Output*
>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>
>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>> to figure out the way to group by file name.
>>>>>
>>>>> It seems like i will need to use something like below, i am not sure
>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>
>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>
>>>>> I know how to do the vertical sum of array of given numbers, but i am
>>>>> not sure how to feed that function to the group by.
>>>>>
>>>>> def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>> counts.toList.transpose.map(_.sum)
>>>>> }
>>>>>
>>>>> ~Thanks,
>>>>> Vinti
>>>>>
>>>>
>>>>
>>>
>>
>
--
Best Regards,
Ayan Guha