so you want data from one physical partition in the disk to go to only one executor?
On Fri, May 3, 2019 at 5:38 PM Tomas Bartalos <tomas.barta...@gmail.com> wrote: > Hello, > > I have partitioned parquet files based on "event_hour" column. > After reading parquet files to spark: > spark.read.format("parquet").load("...") > Files from the same parquet partition are scattered in many spark > partitions. > > Example of mapping spark partition -> parquet partition: > > Spark partition 1 -> 2019050101, 2019050102, 2019050103 > Spark partition 2 -> 2019050101, 2019050103, 2019050104 > ... > Spark partition 20 -> 2019050101, ... > Spark partition 21 -> 2019050101, ... > > As you can see parquet partition 2019050101 is present in Spark partition > 1, 2, 20, 21. > As a result when I write out the dataFrame: > df.write.partitionBy("event_hour").format("parquet").save("...") > > There are many files created in one parquet partition (In case of our > example its 4 files, but in reality its much more) > To speed up queries, my goal is to write 1 file per parquet partition (1 > file per hour). > > So far my only solution is to use repartition: > df.repartition(col("event_hour")) > > But there is a lot of overhead with unnecessary shuffle. I'd like to force > spark to "pickup" the parquet partitioning. > > In my investigation I've found > org.apache.spark.sql.execution.FileSourceScanExec#createNonBucketedReadRDD > <https://github.com/apache/spark/blob/a44880ba74caab7a987128cb09c4bee41617770a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L452> > where the initial partitioning is happening based on file sizes. There is > an explicit ordering which causes parquet partition shuffle. > > thank you for your help, > Tomas >