I have a pretty simple scala spark aggregation job that is summing up number
of occurrences of two types of events. I have run into situations where it
seems to generate bad values that are clearly incorrect after reviewing the
raw data. 

First I have a Record object which I use to do my aggregation: 

class Record (val PrimaryId: Int,
              val SubId: Int,
              var Event1Count: Int,
              var Event2Count: Int) extends Serializable  {
}

Then once I have an RDD I do a reduce by key:

    val allAgr = all.map(x => (s"${x.PrimaryId}-${x.SubId}", x)).reduceByKey
{ (l, r) =>
      l.Event1Count= l.Event1Count+ r.Event1Count
      l.Event2Count= l.Event2Count+ r.Event2Count
      l
    }.map(x => x._2)

The problem is that for some scenarios I get about 16 billion back for
Event1Count, but the value of Event2Count looks fine. If I refactor my
reduce by key function to actually produce a new object, it seems to work:

    val allAgr = all.map(x => (s"${x.PrimaryId}-${x.SubId}", x)).reduceByKey
{ (l, r) =>
      val n = new Record(l.PrimaryId, l.SubId, 0, 0 )
      n.Event1Count= l.Event1Count+ r.Event1Count
      n.Event2Count= l.Event2Count+ r.Event2Count
      n
    }.map(x => x._2)


This second option is clearly the safer way to go since there is no chance
for changing values via reference. However, it doesn't make sense to me that
this should fix it as in map reduce a once a object is reduced, it should
never be reduced again (otherwise double-counting would happen).

I dug into the source a little:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Aggregator.scala


I didn't really see any obvious redflags and admittedly it is beyond my
comprehension.

Any ideas?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Weird-aggregation-results-when-reusing-objects-inside-reduceByKey-tp14287.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to