I have a requirement to write my results out into a series of CSV files. No
file may have more than 100 rows of data. In the past my data was not
sorted, and I was able to use reparation() or coalesce() to ensure the file
length requirement.
I realize that reparation() cause the data to be shuffled. It appears that
changes the data ordering. So I sort the repartioned data again.
What is really strange is I no longer get the number of output files I am
expecting, and the number of lines constraint is not violated
I am using spark-1.6.1
Andy
$ for i in topTags_CSV/*.csv; do wc -l $i; done
19 topTags_CSV/part-00000.csv
19 topTags_CSV/part-00001.csv
20 topTags_CSV/part-00002.csv
19 topTags_CSV/part-00003.csv
22 topTags_CSV/part-00004.csv
19 topTags_CSV/part-00005.csv
26 topTags_CSV/part-00006.csv
18 topTags_CSV/part-00007.csv
12 topTags_CSV/part-00008.csv
25 topTags_CSV/part-00009.csv
32 topTags_CSV/part-00010.csv
53 topTags_CSV/part-00011.csv
89 topTags_CSV/part-00012.csv
146 topTags_CSV/part-00013.csv
387 topTags_CSV/part-00014.csv
2708 topTags_CSV/part-00015.csv
1 topTags_CSV/part-00016.csv
$
numRowsPerCSVFile = 100
numRows = resultDF.count()
quotient, remander = divmod(numRows, numRowsPerCSVFile)
numPartitions = (quotient + 1) if remander > 0 else quotient
debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"
.format(numRows, quotient, remander, numPartitions))
print(debugStr)
csvDF = resultDF.coalesce(numPartitions)
orderByColName = "count"
csvDF = csvDF.sort(orderByColName, ascending=False)
headerArg = 'true'# if headers else 'false'
csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg)
renamePartFiles(outputDir)
numRows:3598 quotient:35 remander:98 repartition(36)