I believe what I am looking for is DataFrameWriter.bucketBy which would allow for bucketing into physical parquet files by the desired columns. Then my question would be can DataFrame/Sets take advantage of this physical bucketing upon read of the parquet file for something like a self-join on the bucketed columns?
On Tue, Oct 18, 2016 at 10:59 PM, adam kramer <ada...@gmail.com> wrote: > Hello All, > > I’m trying to improve join efficiency within (self-join) and across > data sets loaded from different parquet files primarily due to a > multi-stage data ingestion environment. > > Are there specific benefits to shuffling efficiency (e.g. no network > transmission) if the parquet files are written from equivalently > partitioned datasets (i.e. same partition columns and number of > partitions)? > > A self-join and multi-join Scala shell example that uses the method in > question: > % val df1 = > sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-1") > % val df2 = > sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-2") > % val df1_part = df1.repartition(500,$”a",$”b",$”c") > % val df2_part = df2.repartition(500,$”a",$”b",$”c") > % > df1_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-1”) > % > df2_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-2”) > % val reloaded_df1_part = > sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-1”) > % val reloaded_df2_part = > sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-2”) > % val superfast_self_join = > reloaded_df1_part.join(reloaded_df1_part.select($”a”,$”b”,$”c”,$”d”.as(“right_d”)), > Seq(“a”,”b”,”c”)) > % val superfast_multi_join = > reloaded_df1_part.join(reloaded_df2_part.select($”a”,$”b”,$”c”,$”not_in_df1”), > Seq(“a”,”b”,”c”)) > % superfast_self_join.count > % superfast_multi_join.count > > Ignoring the time necessary to repartition and assuming good > partitioning cardinality (while joining groups of rows), are there > performance benefits to this approach for joins ‘superfast_self_join' > and 'superfast_multi_join'? Or is there no benefit as the partitioner > information is lost upon persistence/write to parquet? > > Note I am currently using Spark 1.6.3 and moving to 2.0.1 in the near future. > > Thank you for any insights. > > Adam --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org