Hi,

I have a dataset which I want to write sorted into parquet files for getting
benefit of requesting these files afterwards over Spark including Predicate
Pushdown.

Currently I used repartition by column and the number of partitions to move
the data to the particular partition. The column is identifying the
corresponding partition (beginning from 0 to (fixed) n). The result is that
scala/spark is generating an unexpected result and creating less partitions
(some of them are empty). Maybe a Hash Collision? 


For solving the problem I tried to find out the reason and tried to find
workarounds. I found one workaround by transforming the dataframe to rdd and 
use partitionBy with HashPartitioner. Surprising for me: I got the expected
results. But converting a dataframe to an RDD is not a solution for me,
because it takes too much resources.

I have tested this environment on 
- SPARK 2.0 on cloudera CDH 5.9.3
- SPARK 2.3.1 on emr-5.17.0


Here is my tests with outputs. Please use Spark-shell to run them


        scala> import org.apache.spark.HashPartitioner
        import org.apache.spark.HashPartitioner

        scala> val mydataindex = Array(0,1, 2, 3,4)
        mydataindex: Array[Int] = Array(0, 1, 2, 3, 4)

        scala> val mydata = sc.parallelize(for {
             |  x <- mydataindex
             |  y <- Array(123,456,789)
             | } yield (x, y), 100)
        mydata: org.apache.spark.rdd.RDD[(Int, Int)] =
ParallelCollectionRDD[0] at parallelize at <console>:27

        scala> val rddMyData = mydata.partitionBy(new HashPartitioner(5))
        rddMyData: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at
partitionBy at <console>:26

        scala> val rddMyDataPartitions =   rddMyData.mapPartitionsWithIndex{
             |                 (index, iterator) => {
             |                    val myList = iterator.toList
             |                    myList.map(x => x + " -> " +
index).iterator
             |                 }
             |              }
        rddMyDataPartitions: org.apache.spark.rdd.RDD[String] =
MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:26

        scala>
             | // this is expected:

        scala> rddMyDataPartitions.take(100)
        res1: Array[String] = Array((0,123) -> 0, (0,456) -> 0, (0,789) ->
0, (1,123) -> 1, (1,456) -> 1, (1,789) -> 1, (2,123) -> 2, (2,456) -> 2,
(2,789) -> 2, (3,456) -> 3, (3,789) -> 3, (3,123) -> 3, (4,789) -> 4,
(4,123) -> 4, (4,456) -> 4)

        scala> val dfMyData = mydata.toDF()
        dfMyData: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

        scala> val dfMyDataRepartitioned = dfMyData.repartition(5,col("_1"))
        dfMyDataRepartitioned:
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: int, _2: int]

        scala> dfMyDataRepartitioned.explain(false)
        == Physical Plan ==
        Exchange hashpartitioning(_1#3, 5)
        +- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2,
true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
           +- Scan ExternalRDDScan[obj#2]

        scala> val dfMyDataRepartitionedPartition  =
dfMyDataRepartitioned.withColumn("partition_id",
spark_partition_id()).groupBy("partition_id").count()
        dfMyDataRepartitionedPartition: org.apache.spark.sql.DataFrame =
[partition_id: int, count: bigint]

        scala> // this is unexpected, because 1 partition has more indexes

        scala> dfMyDataRepartitionedPartition.show()
        +------------+-----+
        |partition_id|count|
        +------------+-----+
        |           1|    6|
        |           3|    3|
        |           4|    3|
        |           2|    3|
        +------------+-----+



I also wrote this question to stackoverflow, but I wanted to connect the
experts directly as well :
https://stackoverflow.com/questions/54215601/how-is-exchange-hashpartitioning-working-in-spark


Thanks in advance!




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to