`coalesce` without shuffling can only set fewer partitions than its parent RDD. As Ted said, you need to set true in shuffle, or you need to use `RDD#repartition`.
// maropu On Tue, May 31, 2016 at 11:02 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Value for shuffle is false by default. > > Have you tried setting it to true ? > > Which Spark release are you using ? > > On Tue, May 31, 2016 at 6:13 AM, Maciej Sokołowski <matemac...@gmail.com> > wrote: > >> Hello Spark users and developers. >> >> I read file and want to ensure that it has exact number of partitions, >> for example 128. >> >> In documentation I found: >> >> def textFile(path: String, minPartitions: Int = defaultMinPartitions): >> RDD[String] >> >> But argument here is minimal number of partitions, so I use coalesce to >> ensure desired number of partitions: >> >> def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: >> Ordering[T] = null): RDD[T] >> //Return a new RDD that is reduced into numPartitions partitions. >> >> So I combine them and get number of partitions lower than expected: >> >> scala> sc.textFile("perf_test1.csv", >> minPartitions=128).coalesce(128).getNumPartitions >> res14: Int = 126 >> >> Is this expected behaviour? File contains 100000 lines, size of >> partitions before and after coalesce: >> >> scala> sc.textFile("perf_test1.csv", >> minPartitions=128).mapPartitions{rows => Iterator(rows.length)}.collect() >> res16: Array[Int] = Array(782, 781, 782, 781, 781, 782, 781, 781, 781, >> 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, >> 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, >> 781, 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, >> 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, >> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, >> 781, 782, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, >> 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781, >> 781, 782, 781, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781) >> >> scala> sc.textFile("perf_test1.csv", >> minPartitions=128).coalesce(128).mapPartitions{rows => >> Iterator(rows.length)}.collect() >> res15: Array[Int] = Array(1563, 781, 781, 781, 782, 781, 781, 781, 781, >> 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 781, 782, 781, 781, 781, >> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, >> 781, 782, 781, 781, 781, 781, 1563, 782, 781, 781, 782, 781, 781, 781, 781, >> 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, >> 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, >> 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, >> 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 782, >> 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782) >> >> So two partitions are double the size. Is this expected behaviour or is >> it some kind of bug? >> >> Thanks, >> Maciej Sokołowski >> > > -- --- Takeshi Yamamuro