Hi, I'm sorry , the problem was really silly: In the test the number of
partitions were zero  (it was a division of the original number of
partitions of the RDD source and in the test that number was just one) and
that's why the test was failing.
Anyway, maybe the behavior is weird, I could expect that repartition to
zero was not allowed or at least warned instead of just discarting all the
data .

Thanks for your time!
Regards,
Pedro

El jue, 19 de ago. de 2021 a la(s) 07:42, Jacek Laskowski (ja...@japila.pl)
escribió:

> Hi Pedro,
>
> No idea what might be causing it. Do you perhaps have some code to
> reproduce it locally?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero <tuerope...@gmail.com> wrote:
>
>>
>> Context: spark-core_2.12-3.1.1
>> Testing with maven and eclipse.
>>
>> I'm modifying a project and a test stops working as expected.
>> The difference is in the parameters passed to the function aggregateByKey
>> of JavaPairRDD.
>>
>> JavaSparkContext is created this way:
>> new JavaSparkContext(new SparkConf()
>> .setMaster("local[1]")
>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
>> Then I construct a JavaPairRdd using sparkContext.paralellizePairs and
>> call a method which makes an aggregateByKey over the input JavaPairRDD  and
>> test that the result is the expected.
>>
>> When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue,
>> combiner, merger);
>>  def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U],
>> combFunc: JFunction2[U, U, U]):
>>       JavaPairRDD[K, U] = {
>>     implicit val ctag: ClassTag[U] = fakeClassTag
>>     fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
>>   }
>> The test works as expected.
>> But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue,
>> *partitions*,combiner, merger);)
>> def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc:
>> JFunction2[U, V, U],
>>       combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
>>     implicit val ctag: ClassTag[U] = fakeClassTag
>>     fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc,
>> combFunc))
>>   }
>> The result is always empty. It looks like there is a problem with the
>> hashPartitioner created at PairRddFunctions :
>>  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions:
>> Int)(seqOp: (U, V) => U,
>>       combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>>     aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp,
>> combOp)
>>   }
>> vs:
>>  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>>       combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>>     aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp)
>>   }
>> I can't debug it properly with eclipse, and error occurs when threads are
>> in spark code (system editor can only open file base resources).
>>
>> Does anyone know how to resolve this issue?
>>
>> Thanks in advance,
>> Pedro.
>>
>>
>>
>>

Reply via email to