Hi all, I guess you could do something like this too:
[image: Captura de pantalla 2021-03-16 a las 14.35.46.png] On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <renganatha...@gmail.com> wrote: > Hi Attila, > > Thanks for looking into this! > > I actually found the issue and it turned out to be that the print > statements misled me. The records are indeed stored in different partitions. > What happened is since the foreachpartition method is run parallelly by > different threads, they all printed the first line almost at the same time > and followed by data which is also printed at almost the same time. This > has given an appearance that all the data is stored in a single partition. > When I run the below code, I can see that the objects are stored in > different partitions of course! > > *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e => > println("Index : " +index +" " + e)); itr}, true).collect()* > > Prints the below... (index: ? the ? is actually the partition number) > *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11 > Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) * > > Thanks! > > On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros < > piros.attila.zs...@gmail.com> wrote: > >> Hi! >> >> This is weird. The code of foreachPartition >> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806> >> leads >> to ParallelCollectionRDD >> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107> >> which >> ends in slice >> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>, >> where the most important part is the *positions* method: >> >> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { >> (0 until numSlices).iterator.map { i => >> val start = ((i * length) / numSlices).toInt >> val end = (((i + 1) * length) / numSlices).toInt >> (start, end) >> } >> } >> >> Because of the extra ' (' you used in "*parallelize( (Array*" I thought >> some scala implicit might generate a Seq with one Array in it. >> But in that case your output would contain an Array. So this must be not >> the case. >> >> 1) What Spark/Scala version you are using? on what OS? >> >> 2) Can you reproduce this issue in the spark-shell? >> >> scala> case class Animal(id:Int, name:String) >> defined class Animal >> >> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"), >> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4," >> Tiger"), Animal(5, "Chetah") ) ), 12) >> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at >> parallelize at <console>:27 >> >> scala> myRDD.foreachPartition( e => { println("----------"); >> e.foreach(println) } ) >> ---------- >> ---------- >> ---------- >> Animal(1,Lion) >> ---------- >> ---------- >> Animal(2,Elephant) >> ---------- >> ---------- >> ---------- >> Animal(3,Jaguar) >> ---------- >> ---------- >> Animal(4,Tiger) >> ---------- >> ---------- >> Animal(5,Chetah) >> >> scala> Console println myRDD.getNumPartitions >> 12 >> >> 3) Can you please check spark-shell what happens when you paste the above >> method and call it like: >> >> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] >> = { >> | (0 until numSlices).iterator.map { i => >> | val start = ((i * length) / numSlices).toInt >> | val end = (((i + 1) * length) / numSlices).toInt >> | (start, end) >> | } >> | } >> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)] >> >> scala> positions(5, 12).foreach(println) >> (0,0) >> (0,0) >> (0,1) >> (1,1) >> (1,2) >> (2,2) >> (2,2) >> (2,3) >> (3,3) >> (3,4) >> (4,4) >> (4,5) >> >> As you can see in my case the `positions` result consistent with the >> `foreachPartition` >> and this should be deterministic. >> >> Best regards, >> Attila >> >> >> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah < >> renganatha...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a question with respect to default partitioning in RDD. >>> >>> >>> >>> >>> *case class Animal(id:Int, name:String) val myRDD = >>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"), >>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, >>> "Chetah") ) ))Console println myRDD.getNumPartitions * >>> >>> I am running the above piece of code in my laptop which has 12 logical >>> cores. >>> Hence I see that there are 12 partitions created. >>> >>> My understanding is that hash partitioning is used to determine which >>> object needs to go to which partition. So in this case, the formula would >>> be: hashCode() % 12 >>> But when I further examine, I see all the RDDs are put in the last >>> partition. >>> >>> *myRDD.foreachPartition( e => { println("----------"); >>> e.foreach(println) } )* >>> >>> Above code prints the below(first eleven partitions are empty and the >>> last one has all the objects. The line is separate the partition contents): >>> ---------- >>> ---------- >>> ---------- >>> ---------- >>> ---------- >>> ---------- >>> ---------- >>> ---------- >>> ---------- >>> ---------- >>> ---------- >>> ---------- >>> Animal(2,Elephant) >>> Animal(4,Tiger) >>> Animal(3,Jaguar) >>> Animal(5,Chetah) >>> Animal(1,Lion) >>> >>> I don't know why this happens. Can you please help. >>> >>> Thanks! >>> >>