Hi all,

I guess you could do something like this too:

[image: Captura de pantalla 2021-03-16 a las 14.35.46.png]

On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <renganatha...@gmail.com>
wrote:

> Hi Attila,
>
> Thanks for looking into this!
>
> I actually found the issue and it turned out to be that the print
> statements misled me. The records are indeed stored in different partitions.
> What happened is since the foreachpartition method is run parallelly by
> different threads, they all printed the first line almost at the same time
> and followed by data which is also printed at almost the same time. This
> has given an appearance that all the data is stored in a single partition.
> When I run the below code, I can see that the objects are stored in
> different partitions of course!
>
> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
> println("Index : " +index +" " + e)); itr}, true).collect()*
>
> Prints the below... (index: ?  the ? is actually the partition number)
> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *
>
> Thanks!
>
> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
> piros.attila.zs...@gmail.com> wrote:
>
>> Hi!
>>
>> This is weird. The code of foreachPartition
>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
>>  leads
>> to ParallelCollectionRDD
>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
>>  which
>> ends in slice
>> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
>> where the most important part is the *positions* method:
>>
>>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>>      (0 until numSlices).iterator.map { i =>
>>         val start = ((i * length) / numSlices).toInt
>>         val end = (((i + 1) * length) / numSlices).toInt
>>         (start, end)
>>      }
>>  }
>>
>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
>> some scala implicit might generate a Seq with one Array in it.
>> But in that case your output would contain an Array. So this must be not
>> the case.
>>
>> 1) What Spark/Scala version you are using? on what OS?
>>
>> 2)  Can you reproduce this issue in the spark-shell?
>>
>> scala> case class Animal(id:Int, name:String)
>> defined class Animal
>>
>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
>> Tiger"), Animal(5, "Chetah") ) ), 12)
>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
>> parallelize at <console>:27
>>
>> scala> myRDD.foreachPartition( e => { println("----------");
>> e.foreach(println) } )
>> ----------
>> ----------
>> ----------
>> Animal(1,Lion)
>> ----------
>> ----------
>> Animal(2,Elephant)
>> ----------
>> ----------
>> ----------
>> Animal(3,Jaguar)
>> ----------
>> ----------
>> Animal(4,Tiger)
>> ----------
>> ----------
>> Animal(5,Chetah)
>>
>> scala> Console println myRDD.getNumPartitions
>> 12
>>
>> 3) Can you please check spark-shell what happens when you paste the above
>> method and call it like:
>>
>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)]
>> = {
>>      |   (0 until numSlices).iterator.map { i =>
>>      |     val start = ((i * length) / numSlices).toInt
>>      |       val end = (((i + 1) * length) / numSlices).toInt
>>      |       (start, end)
>>      |   }
>>      | }
>> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>>
>> scala> positions(5, 12).foreach(println)
>> (0,0)
>> (0,0)
>> (0,1)
>> (1,1)
>> (1,2)
>> (2,2)
>> (2,2)
>> (2,3)
>> (3,3)
>> (3,4)
>> (4,4)
>> (4,5)
>>
>> As you can see in my case the `positions` result consistent with the 
>> `foreachPartition`
>> and this should be deterministic.
>>
>> Best regards,
>> Attila
>>
>>
>> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
>> renganatha...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a question with respect to default partitioning in RDD.
>>>
>>>
>>>
>>>
>>> *case class Animal(id:Int, name:String)   val myRDD =
>>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
>>> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>>>
>>> I am running the above piece of code in my laptop which has 12 logical
>>> cores.
>>> Hence I see that there are 12 partitions created.
>>>
>>> My understanding is that hash partitioning is used to determine which
>>> object needs to go to which partition. So in this case, the formula would
>>> be: hashCode() % 12
>>> But when I further examine, I see all the RDDs are put in the last
>>> partition.
>>>
>>> *myRDD.foreachPartition( e => { println("----------");
>>> e.foreach(println) } )*
>>>
>>> Above code prints the below(first eleven partitions are empty and the
>>> last one has all the objects. The line is separate the partition contents):
>>> ----------
>>> ----------
>>> ----------
>>> ----------
>>> ----------
>>> ----------
>>> ----------
>>> ----------
>>> ----------
>>> ----------
>>> ----------
>>> ----------
>>> Animal(2,Elephant)
>>> Animal(4,Tiger)
>>> Animal(3,Jaguar)
>>> Animal(5,Chetah)
>>> Animal(1,Lion)
>>>
>>> I don't know why this happens. Can you please help.
>>>
>>> Thanks!
>>>
>>

Reply via email to