Oh duh, sorry. The initialization should of course be (v) => (if (v >
0) 1 else 0, 1)
This gives the answer you are looking for. I don't see what Part2 is
supposed to do differently.

On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
<[email protected]> wrote:
> Hello Sean,
>
> Thank you, but changing from v to 1 doesn't help me either.
>
> I am trying to count the number of non-zero values using the first
> accumulator.
> val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), ("SFO",0),
> ("SFO",9))
>
> val plist = sc.parallelize(newlist)
>
>  val part1 = plist.combineByKey(
>    (v) => (1, 1),
>    (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1),
>    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
> acc2._2)
>    )
>
>    val Part2 = part1.map{ case (key, value) => (key, (value._1,value._2)) }
>
> This should give me the result
> (LAX,(2,3))
> (SFO,(1,3))
>
>
>
> On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen <[email protected]> wrote:
>>
>> You have a typo in your code at "var acc:", and the map from opPart1
>> to opPart2 looks like a no-op, but those aren't the problem I think.
>> It sounds like you intend the first element of each pair to be a count
>> of nonzero values, but you initialize the first element of the pair to
>> v, not 1, in v => (v,1). Try v => (1,1)
>>
>>
>> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
>> <[email protected]> wrote:
>> >
>> > I am a beginner to Spark and finding it difficult to implement a very
>> > simple
>> > reduce operation. I read that is ideal to use combineByKey for complex
>> > reduce operations.
>> >
>> > My input:
>> >
>> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
>> > ("SFO",0),
>> > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
>> > ("KX",9),
>> >
>> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
>> >
>> >
>> >  val opPart1 = input.combineByKey(
>> >    (v) => (v, 1),
>> >    (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1,
>> > acc._2 +
>> > 1),
>> >    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> > acc2._2)
>> >    )
>> >
>> >    val opPart2 = opPart1.map{ case (key, value) => (key,
>> > (value._1,value._2)) }
>> >
>> >  opPart2.collectAsMap().map(println(_))
>> >
>> > If the value is greater than 0, the first accumulator should be
>> > incremented
>> > by 1, else it remains the same. The second accumulator is a simple
>> > counter
>> > for each value.  I am getting an incorrect output (garbage values )for
>> > the
>> > first accumulator. Please help.
>> >
>> >  The equivalent reduce operation in Hadoop MapReduce is :
>> >
>> > public static class PercentageCalcReducer extends
>> > Reducer<Text,IntWritable,Text,FloatWritable>
>> >
>> > {
>> >
>> > private FloatWritable pdelay = new FloatWritable();
>> >
>> >
>> > public void reduce(Text key, Iterable<IntWritable> values,Context
>> > context)throws IOException,InterruptedException
>> >
>> >             {
>> >
>> > int acc2=0;
>> >
>> > float frac_delay, percentage_delay;
>> >
>> > int acc1=0;
>> >
>> > for(IntWritable val : values)
>> >
>> > {
>> >
>> > if(val.get() > 0)
>> >
>> > {
>> >
>> > acc1++;
>> >
>> > }
>> >
>> > acc2++;
>> >
>> > }
>> >
>> >
>> >
>> > frac_delay = (float)acc1/acc2;
>> >
>> > percentage_delay = frac_delay * 100 ;
>> >
>> > pdelay.set(percentage_delay);
>> >
>> > context.write(key,pdelay);
>> >
>> >             }
>> >
>> > }
>> >
>> >
>> > Please help. Thank you for your time.
>> >
>> > --
>> >
>> > Regards,
>> >
>> > Haripriya Ayyalasomayajula
>> > contact : 650-796-7112
>
>
>
>
> --
> Regards,
> Haripriya Ayyalasomayajula
> contact : 650-796-7112

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to