Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
For reference, final solution:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all
streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", 9999)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2,
splitLines.length).map(_.trim.toInt))
})
.reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
})
.foreachRDD(rdd => rdd.foreach(Blaher.blah))
}
Regards,
Vinti
On Sun, Feb 21, 2016 at 2:22 PM, ayan guha <[email protected]> wrote:
> I believe the best way would be to use reduceByKey operation.
>
> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
> [email protected]> wrote:
>
>> You will need to do a collect and update a global map if you want to.
>>
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>> .foreachRDD(rdd => {
>> rdd.collect().foreach((fileName, valueTuple) => <update global
>> map here>)
>> })
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <[email protected]>
>> wrote:
>>
>>> Nevermind, seems like an executor level mutable map is not recommended
>>> as stated in
>>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>>
>>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <[email protected]>
>>> wrote:
>>>
>>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>>> suggested:
>>>>
>>>> def parseCoverageLine(str: String) = {
>>>> val arr = str.split(",")
>>>> ...
>>>> ...
>>>> (arr(0), arr(1) :: count.toList) // (test, [file, 1, 1, 2])
>>>> }
>>>>
>>>> Then in the grouping, can i use a global hash map per executor /
>>>> partition to aggregate the results?
>>>>
>>>> val globalMap:[String: List[Int]] = Map()
>>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>>> // if exists in global map, append result else add new key
>>>>
>>>> // globalMap
>>>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>>> })
>>>>
>>>> Thanks,
>>>> Vinti
>>>>
>>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <[email protected]>
>>>> wrote:
>>>>
>>>>> Hello Vinti,
>>>>>
>>>>> One way to get this done is you split your input line into key and
>>>>> value tuple and then you can simply use groupByKey and handle the values
>>>>> the way you want. For example:
>>>>>
>>>>> Assuming you have already split the values into a 5 tuple:
>>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>>> .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
>>>>> + r2._3))
>>>>>
>>>>> I hope that helps.
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Jatin Kumar | Rocket Scientist
>>>>> +91-7696741743 m
>>>>>
>>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have input lines like below
>>>>>>
>>>>>> *Input*
>>>>>> t1, file1, 1, 1, 1
>>>>>> t1, file1, 1, 2, 3
>>>>>> t1, file2, 2, 2, 2, 2
>>>>>> t2, file1, 5, 5, 5
>>>>>> t2, file2, 1, 1, 2, 2
>>>>>>
>>>>>> and i want to achieve the output like below rows which is a vertical
>>>>>> addition of the corresponding numbers.
>>>>>>
>>>>>> *Output*
>>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>>
>>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>>> to figure out the way to group by file name.
>>>>>>
>>>>>> It seems like i will need to use something like below, i am not sure
>>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>>
>>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>>
>>>>>> I know how to do the vertical sum of array of given numbers, but i am
>>>>>> not sure how to feed that function to the group by.
>>>>>>
>>>>>> def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>>> counts.toList.transpose.map(_.sum)
>>>>>> }
>>>>>>
>>>>>> ~Thanks,
>>>>>> Vinti
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>