I have a pretty simple scala spark aggregation job that is summing up number of occurrences of two types of events. I have run into situations where it seems to generate bad values that are clearly incorrect after reviewing the raw data.
First I have a Record object which I use to do my aggregation: class Record (val PrimaryId: Int, val SubId: Int, var Event1Count: Int, var Event2Count: Int) extends Serializable { } Then once I have an RDD I do a reduce by key: val allAgr = all.map(x => (s"${x.PrimaryId}-${x.SubId}", x)).reduceByKey { (l, r) => l.Event1Count= l.Event1Count+ r.Event1Count l.Event2Count= l.Event2Count+ r.Event2Count l }.map(x => x._2) The problem is that for some scenarios I get about 16 billion back for Event1Count, but the value of Event2Count looks fine. If I refactor my reduce by key function to actually produce a new object, it seems to work: val allAgr = all.map(x => (s"${x.PrimaryId}-${x.SubId}", x)).reduceByKey { (l, r) => val n = new Record(l.PrimaryId, l.SubId, 0, 0 ) n.Event1Count= l.Event1Count+ r.Event1Count n.Event2Count= l.Event2Count+ r.Event2Count n }.map(x => x._2) This second option is clearly the safer way to go since there is no chance for changing values via reference. However, it doesn't make sense to me that this should fix it as in map reduce a once a object is reduced, it should never be reduced again (otherwise double-counting would happen). I dug into the source a little: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Aggregator.scala I didn't really see any obvious redflags and admittedly it is beyond my comprehension. Any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Weird-aggregation-results-when-reusing-objects-inside-reduceByKey-tp14287.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org