I believe what I am looking for is DataFrameWriter.bucketBy which
would allow for bucketing into physical parquet files by the desired
columns. Then my question would be can DataFrame/Sets take advantage
of this physical bucketing upon read of the parquet file for something
like a self-join on the bucketed columns?

On Tue, Oct 18, 2016 at 10:59 PM, adam kramer <ada...@gmail.com> wrote:
> Hello All,
>
> I’m trying to improve join efficiency within (self-join) and across
> data sets loaded from different parquet files primarily due to a
> multi-stage data ingestion environment.
>
> Are there specific benefits to shuffling efficiency (e.g. no network
> transmission) if the parquet files are written from equivalently
> partitioned datasets (i.e. same partition columns and number of
> partitions)?
>
> A self-join and multi-join Scala shell example that uses the method in 
> question:
> % val df1 = 
> sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-1")
> % val df2 = 
> sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-2")
> % val df1_part = df1.repartition(500,$”a",$”b",$”c")
> % val df2_part = df2.repartition(500,$”a",$”b",$”c")
> % 
> df1_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-1”)
> % 
> df2_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-2”)
> % val reloaded_df1_part =
> sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-1”)
> % val reloaded_df2_part =
> sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-2”)
> % val superfast_self_join =
> reloaded_df1_part.join(reloaded_df1_part.select($”a”,$”b”,$”c”,$”d”.as(“right_d”)),
> Seq(“a”,”b”,”c”))
> % val superfast_multi_join =
> reloaded_df1_part.join(reloaded_df2_part.select($”a”,$”b”,$”c”,$”not_in_df1”),
> Seq(“a”,”b”,”c”))
> % superfast_self_join.count
> % superfast_multi_join.count
>
> Ignoring the time necessary to repartition and assuming good
> partitioning cardinality (while joining groups of rows), are there
> performance benefits to this approach for joins ‘superfast_self_join'
> and 'superfast_multi_join'? Or is there no benefit as the partitioner
> information is lost upon persistence/write to parquet?
>
> Note I am currently using Spark 1.6.3 and moving to 2.0.1 in the near future.
>
> Thank you for any insights.
>
> Adam

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to