Same here, repartition(0) throws IllegalArgument (What I would have expected for ) , but aggregateByKey(zeroValue, 0, seqFunc, combFunc) is not throwing any exception nor logging any error message. The only consequence is an empty RDD.
El sáb, 21 de ago. de 2021 a la(s) 07:45, Jacek Laskowski (ja...@japila.pl) escribió: > Hi Pedro, > > > Anyway, maybe the behavior is weird, I could expect that repartition to > zero was not allowed or at least warned instead of just discarting all the > data . > > Interesting... > > scala> spark.version > res3: String = 3.1.2 > > scala> spark.range(5).repartition(0) > java.lang.IllegalArgumentException: requirement failed: Number of > partitions (0) must be positive. > at scala.Predef$.require(Predef.scala:281) > at > org.apache.spark.sql.catalyst.plans.logical.Repartition.<init>(basicLogicalOperators.scala:1032) > at org.apache.spark.sql.Dataset.repartition(Dataset.scala:3016) > ... 47 elided > > How are the above different from yours? > > Pozdrawiam, > Jacek Laskowski > ---- > https://about.me/JacekLaskowski > "The Internals Of" Online Books <https://books.japila.pl/> > Follow me on https://twitter.com/jaceklaskowski > > <https://twitter.com/jaceklaskowski> > > > On Thu, Aug 19, 2021 at 5:43 PM Pedro Tuero <tuerope...@gmail.com> wrote: > >> Hi, I'm sorry , the problem was really silly: In the test the number of >> partitions were zero (it was a division of the original number of >> partitions of the RDD source and in the test that number was just one) and >> that's why the test was failing. >> Anyway, maybe the behavior is weird, I could expect that repartition to >> zero was not allowed or at least warned instead of just discarting all the >> data . >> >> Thanks for your time! >> Regards, >> Pedro >> >> El jue, 19 de ago. de 2021 a la(s) 07:42, Jacek Laskowski ( >> ja...@japila.pl) escribió: >> >>> Hi Pedro, >>> >>> No idea what might be causing it. Do you perhaps have some code to >>> reproduce it locally? >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> ---- >>> https://about.me/JacekLaskowski >>> "The Internals Of" Online Books <https://books.japila.pl/> >>> Follow me on https://twitter.com/jaceklaskowski >>> >>> <https://twitter.com/jaceklaskowski> >>> >>> >>> On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero <tuerope...@gmail.com> >>> wrote: >>> >>>> >>>> Context: spark-core_2.12-3.1.1 >>>> Testing with maven and eclipse. >>>> >>>> I'm modifying a project and a test stops working as expected. >>>> The difference is in the parameters passed to the function >>>> aggregateByKey of JavaPairRDD. >>>> >>>> JavaSparkContext is created this way: >>>> new JavaSparkContext(new SparkConf() >>>> .setMaster("local[1]") >>>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")); >>>> Then I construct a JavaPairRdd using sparkContext.paralellizePairs and >>>> call a method which makes an aggregateByKey over the input JavaPairRDD and >>>> test that the result is the expected. >>>> >>>> When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue, >>>> combiner, merger); >>>> def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], >>>> combFunc: JFunction2[U, U, U]): >>>> JavaPairRDD[K, U] = { >>>> implicit val ctag: ClassTag[U] = fakeClassTag >>>> fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc)) >>>> } >>>> The test works as expected. >>>> But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue, >>>> *partitions*,combiner, merger);) >>>> def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc: >>>> JFunction2[U, V, U], >>>> combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = { >>>> implicit val ctag: ClassTag[U] = fakeClassTag >>>> fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc, >>>> combFunc)) >>>> } >>>> The result is always empty. It looks like there is a problem with the >>>> hashPartitioner created at PairRddFunctions : >>>> def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: >>>> Int)(seqOp: (U, V) => U, >>>> combOp: (U, U) => U): RDD[(K, U)] = self.withScope { >>>> aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp, >>>> combOp) >>>> } >>>> vs: >>>> def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, >>>> combOp: (U, U) => U): RDD[(K, U)] = self.withScope { >>>> aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, >>>> combOp) >>>> } >>>> I can't debug it properly with eclipse, and error occurs when threads >>>> are in spark code (system editor can only open file base resources). >>>> >>>> Does anyone know how to resolve this issue? >>>> >>>> Thanks in advance, >>>> Pedro. >>>> >>>> >>>> >>>>