You have a typo in your code at "var acc:", and the map from opPart1
to opPart2 looks like a no-op, but those aren't the problem I think.
It sounds like you intend the first element of each pair to be a count
of nonzero values, but you initialize the first element of the pair to
v, not 1, in v => (v,1). Try v => (1,1)


On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
<aharipriy...@gmail.com> wrote:
>
> I am a beginner to Spark and finding it difficult to implement a very simple
> reduce operation. I read that is ideal to use combineByKey for complex
> reduce operations.
>
> My input:
>
> val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), ("SFO",0),
> ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
> ("KX",9),
> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
>
>
>  val opPart1 = input.combineByKey(
>    (v) => (v, 1),
>    (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 +
> 1),
>    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
> acc2._2)
>    )
>
>    val opPart2 = opPart1.map{ case (key, value) => (key,
> (value._1,value._2)) }
>
>  opPart2.collectAsMap().map(println(_))
>
> If the value is greater than 0, the first accumulator should be incremented
> by 1, else it remains the same. The second accumulator is a simple counter
> for each value.  I am getting an incorrect output (garbage values )for the
> first accumulator. Please help.
>
>  The equivalent reduce operation in Hadoop MapReduce is :
>
> public static class PercentageCalcReducer extends
> Reducer<Text,IntWritable,Text,FloatWritable>
>
> {
>
> private FloatWritable pdelay = new FloatWritable();
>
>
> public void reduce(Text key, Iterable<IntWritable> values,Context
> context)throws IOException,InterruptedException
>
>             {
>
> int acc2=0;
>
> float frac_delay, percentage_delay;
>
> int acc1=0;
>
> for(IntWritable val : values)
>
> {
>
> if(val.get() > 0)
>
> {
>
> acc1++;
>
> }
>
> acc2++;
>
> }
>
>
>
> frac_delay = (float)acc1/acc2;
>
> percentage_delay = frac_delay * 100 ;
>
> pdelay.set(percentage_delay);
>
> context.write(key,pdelay);
>
>             }
>
> }
>
>
> Please help. Thank you for your time.
>
> --
>
> Regards,
>
> Haripriya Ayyalasomayajula
> contact : 650-796-7112

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to