Same here, repartition(0) throws IllegalArgument (What I would have
expected for ) , but aggregateByKey(zeroValue, 0, seqFunc, combFunc) is not
throwing any exception nor logging any error message. The only consequence
is an empty RDD.

El sáb, 21 de ago. de 2021 a la(s) 07:45, Jacek Laskowski (ja...@japila.pl)
escribió:

> Hi Pedro,
>
> > Anyway, maybe the behavior is weird, I could expect that repartition to
> zero was not allowed or at least warned instead of just discarting all the
> data .
>
> Interesting...
>
> scala> spark.version
> res3: String = 3.1.2
>
> scala> spark.range(5).repartition(0)
> java.lang.IllegalArgumentException: requirement failed: Number of
> partitions (0) must be positive.
>   at scala.Predef$.require(Predef.scala:281)
>   at
> org.apache.spark.sql.catalyst.plans.logical.Repartition.<init>(basicLogicalOperators.scala:1032)
>   at org.apache.spark.sql.Dataset.repartition(Dataset.scala:3016)
>   ... 47 elided
>
> How are the above different from yours?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Thu, Aug 19, 2021 at 5:43 PM Pedro Tuero <tuerope...@gmail.com> wrote:
>
>> Hi, I'm sorry , the problem was really silly: In the test the number of
>> partitions were zero  (it was a division of the original number of
>> partitions of the RDD source and in the test that number was just one) and
>> that's why the test was failing.
>> Anyway, maybe the behavior is weird, I could expect that repartition to
>> zero was not allowed or at least warned instead of just discarting all the
>> data .
>>
>> Thanks for your time!
>> Regards,
>> Pedro
>>
>> El jue, 19 de ago. de 2021 a la(s) 07:42, Jacek Laskowski (
>> ja...@japila.pl) escribió:
>>
>>> Hi Pedro,
>>>
>>> No idea what might be causing it. Do you perhaps have some code to
>>> reproduce it locally?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> <https://twitter.com/jaceklaskowski>
>>>
>>>
>>> On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero <tuerope...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Context: spark-core_2.12-3.1.1
>>>> Testing with maven and eclipse.
>>>>
>>>> I'm modifying a project and a test stops working as expected.
>>>> The difference is in the parameters passed to the function
>>>> aggregateByKey of JavaPairRDD.
>>>>
>>>> JavaSparkContext is created this way:
>>>> new JavaSparkContext(new SparkConf()
>>>> .setMaster("local[1]")
>>>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
>>>> Then I construct a JavaPairRdd using sparkContext.paralellizePairs and
>>>> call a method which makes an aggregateByKey over the input JavaPairRDD  and
>>>> test that the result is the expected.
>>>>
>>>> When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue,
>>>> combiner, merger);
>>>>  def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U],
>>>> combFunc: JFunction2[U, U, U]):
>>>>       JavaPairRDD[K, U] = {
>>>>     implicit val ctag: ClassTag[U] = fakeClassTag
>>>>     fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
>>>>   }
>>>> The test works as expected.
>>>> But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue,
>>>> *partitions*,combiner, merger);)
>>>> def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc:
>>>> JFunction2[U, V, U],
>>>>       combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
>>>>     implicit val ctag: ClassTag[U] = fakeClassTag
>>>>     fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc,
>>>> combFunc))
>>>>   }
>>>> The result is always empty. It looks like there is a problem with the
>>>> hashPartitioner created at PairRddFunctions :
>>>>  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions:
>>>> Int)(seqOp: (U, V) => U,
>>>>       combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>>>>     aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp,
>>>> combOp)
>>>>   }
>>>> vs:
>>>>  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>>>>       combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>>>>     aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp,
>>>> combOp)
>>>>   }
>>>> I can't debug it properly with eclipse, and error occurs when threads
>>>> are in spark code (system editor can only open file base resources).
>>>>
>>>> Does anyone know how to resolve this issue?
>>>>
>>>> Thanks in advance,
>>>> Pedro.
>>>>
>>>>
>>>>
>>>>

Reply via email to