Oh duh, sorry. The initialization should of course be (v) => (if (v > 0) 1 else 0, 1) This gives the answer you are looking for. I don't see what Part2 is supposed to do differently.
On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA <[email protected]> wrote: > Hello Sean, > > Thank you, but changing from v to 1 doesn't help me either. > > I am trying to count the number of non-zero values using the first > accumulator. > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), ("SFO",0), > ("SFO",9)) > > val plist = sc.parallelize(newlist) > > val part1 = plist.combineByKey( > (v) => (1, 1), > (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1), > (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > acc2._2) > ) > > val Part2 = part1.map{ case (key, value) => (key, (value._1,value._2)) } > > This should give me the result > (LAX,(2,3)) > (SFO,(1,3)) > > > > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen <[email protected]> wrote: >> >> You have a typo in your code at "var acc:", and the map from opPart1 >> to opPart2 looks like a no-op, but those aren't the problem I think. >> It sounds like you intend the first element of each pair to be a count >> of nonzero values, but you initialize the first element of the pair to >> v, not 1, in v => (v,1). Try v => (1,1) >> >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA >> <[email protected]> wrote: >> > >> > I am a beginner to Spark and finding it difficult to implement a very >> > simple >> > reduce operation. I read that is ideal to use combineByKey for complex >> > reduce operations. >> > >> > My input: >> > >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), >> > ("SFO",0), >> > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), >> > ("KX",9), >> > >> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) >> > >> > >> > val opPart1 = input.combineByKey( >> > (v) => (v, 1), >> > (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, >> > acc._2 + >> > 1), >> > (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> > acc2._2) >> > ) >> > >> > val opPart2 = opPart1.map{ case (key, value) => (key, >> > (value._1,value._2)) } >> > >> > opPart2.collectAsMap().map(println(_)) >> > >> > If the value is greater than 0, the first accumulator should be >> > incremented >> > by 1, else it remains the same. The second accumulator is a simple >> > counter >> > for each value. I am getting an incorrect output (garbage values )for >> > the >> > first accumulator. Please help. >> > >> > The equivalent reduce operation in Hadoop MapReduce is : >> > >> > public static class PercentageCalcReducer extends >> > Reducer<Text,IntWritable,Text,FloatWritable> >> > >> > { >> > >> > private FloatWritable pdelay = new FloatWritable(); >> > >> > >> > public void reduce(Text key, Iterable<IntWritable> values,Context >> > context)throws IOException,InterruptedException >> > >> > { >> > >> > int acc2=0; >> > >> > float frac_delay, percentage_delay; >> > >> > int acc1=0; >> > >> > for(IntWritable val : values) >> > >> > { >> > >> > if(val.get() > 0) >> > >> > { >> > >> > acc1++; >> > >> > } >> > >> > acc2++; >> > >> > } >> > >> > >> > >> > frac_delay = (float)acc1/acc2; >> > >> > percentage_delay = frac_delay * 100 ; >> > >> > pdelay.set(percentage_delay); >> > >> > context.write(key,pdelay); >> > >> > } >> > >> > } >> > >> > >> > Please help. Thank you for your time. >> > >> > -- >> > >> > Regards, >> > >> > Haripriya Ayyalasomayajula >> > contact : 650-796-7112 > > > > > -- > Regards, > Haripriya Ayyalasomayajula > contact : 650-796-7112 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
