Hi,
"csvDF = csvDF.sort(orderByColName, ascending=False)" repartitions DF by
using RangePartitioner
(#partitions depends on "spark.sql.shuffle.partitions").
Seems, in your case, some empty partitions were removed, then you got 17
paritions.
// maropu
On Wed, Mar 30, 2016 at 6:49 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:
> I have a requirement to write my results out into a series of CSV files.
> No file may have more than 100 rows of data. In the past my data was not
> sorted, and I was able to use reparation() or coalesce() to ensure the
> file length requirement.
>
> I realize that reparation() cause the data to be shuffled. It appears that
> changes the data ordering. So I sort the repartioned data again.
>
> What is really strange is I no longer get the number of output files I am
> expecting, and the number of lines constraint is not violated
>
> I am using spark-1.6.1
>
> Andy
>
> $ for i in topTags_CSV/*.csv; do wc -l $i; done
>
> 19 topTags_CSV/part-0.csv
>
> 19 topTags_CSV/part-1.csv
>
> 20 topTags_CSV/part-2.csv
>
> 19 topTags_CSV/part-3.csv
>
> 22 topTags_CSV/part-4.csv
>
> 19 topTags_CSV/part-5.csv
>
> 26 topTags_CSV/part-6.csv
>
> 18 topTags_CSV/part-7.csv
>
> 12 topTags_CSV/part-8.csv
>
> 25 topTags_CSV/part-9.csv
>
> 32 topTags_CSV/part-00010.csv
>
> 53 topTags_CSV/part-00011.csv
>
> 89 topTags_CSV/part-00012.csv
>
> 146 topTags_CSV/part-00013.csv
>
> 387 topTags_CSV/part-00014.csv
>
> 2708 topTags_CSV/part-00015.csv
>
>1 topTags_CSV/part-00016.csv
>
> $
>
> numRowsPerCSVFile = 100
>
> numRows = resultDF.count()
>
> quotient, remander = divmod(numRows, numRowsPerCSVFile)
>
> numPartitions = (quotient + 1) if remander > 0 else quotient
>
>
>
> debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"
>
> .format(numRows, quotient, remander, numPartitions))
>
> print(debugStr)
>
>
>
> csvDF = resultDF.coalesce(numPartitions)
>
>
>
> orderByColName = "count"
>
> csvDF = csvDF.sort(orderByColName, ascending=False)
>
> headerArg = 'true'# if headers else 'false'
>
> csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg)
>
> renamePartFiles(outputDir)
>
> numRows:3598 quotient:35 remander:98 repartition(36)
>
>
>
>
>
--
---
Takeshi Yamamuro