Hello All I am observing some strange results with aggregateByKey API which is implemented with combineByKey. Not sure if this is by design or bug -
I created this toy example but same problem can be observed on large datasets as well - *case class ABC(key: Int, c1: Int, c2: Int)* *case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)* // Create RDD and making sure if has 1 or 2 partitions for this example. // With 2 partitions there are high chances that same key could be in same partition. *val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20, 40), ABC(2, 20, 40))).coalece(2)* Now, I am running aggregateByKey where I am grouping by Key to sum c1 and c2 but return ABCoutput with new 'desc' property. *val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))* Above query may return results like this - [image: Inline image 1] It means for one of the keys which has all values in same partition didn't invoke mergeCombiner function which returns ABCoutput with desc=final. I am expecting mergeCombiner function to be invoked all the time which is not happening. Correct me if wrong, but is this expected behavior? Further debugging shows that it works fine if I create input RDD with more partitions( which increases chances of having rows with same key in different partitions) *val b = a.repartition(20).keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))* [image: Inline image 2] One more thing to mention - If I make sure my input RDD is partitioned then it simply runs aggregation with mapPartitions (here <https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L93>). Now, this makes sense in terms of aggregations as all values for given key are in same partition. However, I have something in my mergeCombiner function that I would like to run which wont get invoked. Traditional map reduce allows to have different combiner and reduce function and it is guaranteed that reduce is always invoked. I can see that running aggregations with no shuffle has performance gains but API seems to be confusing/misleading. User might hope that mergeCombiner gets invoked but in reality it isn't. It will be great if this API designers can shed some light on this. *import org.apache.spark.HashPartitioner* *val b = a.keyBy(x => x.key).partitionBy(new HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))* [image: Inline image 3] Above examples shows this behavior with AggregateByKey but same thing can be observed with CombineByKey as well. *val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key, "initial", x.c1, x.c2), * * (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum + x2.c1, x1.c2Sum+x2.c2),* * (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum + x2.c1Sum, x1.c2Sum+x2.c2Sum))* *[image: Inline image 4]* Please let me know if you need any further information and correct me if my understanding of API is wrong. Thanks Swapnil