Context: spark-core_2.12-3.1.1
Testing with maven and eclipse.

I'm modifying a project and a test stops working as expected.
The difference is in the parameters passed to the function aggregateByKey
of JavaPairRDD.

JavaSparkContext is created this way:
new JavaSparkContext(new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
Then I construct a JavaPairRdd using sparkContext.paralellizePairs and call
a method which makes an aggregateByKey over the input JavaPairRDD  and test
that the result is the expected.

When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue, combiner,
 def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U],
combFunc: JFunction2[U, U, U]):
      JavaPairRDD[K, U] = {
    implicit val ctag: ClassTag[U] = fakeClassTag
    fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
The test works as expected.
But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue,
*partitions*,combiner, merger);)
def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc:
JFunction2[U, V, U],
      combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
    implicit val ctag: ClassTag[U] = fakeClassTag
    fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc,
The result is always empty. It looks like there is a problem with the
hashPartitioner created at PairRddFunctions :
 def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp:
(U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp,
 def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp)
I can't debug it properly with eclipse, and error occurs when threads are
in spark code (system editor can only open file base resources).

Does anyone know how to resolve this issue?

Thanks in advance,

Reply via email to