Thanks for the reply.

I understand that I need to use bucketBy() to write my master file,
but I still can't seem to make it work as expected.  Here's a code
example for how I'm writing my master file:

Range(0, 1000000)
  .map(i => (i, s"master_$i"))
  .toDF("key", "value")
  .write
  .format("json")
  .bucketBy(3, "key")
  .sortBy("key")
  .saveAsTable("master")

And here's how I'm reading it later and attempting to join to a
transaction dataset:

val master = spark
  .read
  .format("json")
  .json("spark-warehouse/master")
  .cache

val transaction = Range(0, 1000000)
  .map(i => (i, s"transaction_$i"))
  .toDF("key", "value")
  .repartition(3, 'key)
  .sortWithinPartitions('key)
  .cache

val results = master.join(transaction, "key")

When I call results.explain(), I see that it is sorting both datasets
before sending them through SortMergeJoin.

== Physical Plan ==
*Project [key#0L, value#1, value#53]
+- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner
   :- *Sort [key#0L ASC], false, 0
   :  +- Exchange hashpartitioning(key#0L, 200)
   :     +- *Filter isnotnull(key#0L)
   :        +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]
   :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas)
   :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,
InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<key:bigint,value:string>
   +- *Sort [cast(key#52 as bigint) ASC], false, 0
      +- Exchange hashpartitioning(cast(key#52 as bigint), 200)
         +- InMemoryTableScan [key#52, value#53]
            :  +- InMemoryRelation [key#52, value#53], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas)
            :     :  +- *Sort [key#52 ASC], false, 0
            :     :     +- Exchange hashpartitioning(key#52, 3)
            :     :        +- LocalTableScan [key#52, value#53]

Here are my thoughts:
1. I think I'm probably reading the master file back into memory
incorrectly.  I think maybe I should be reading it as a Hive table
rather than just a plain json file, but I can't seem to figure out how
to do that.
2. I don't understand exactly when partition counts/bucket counts are
important.  For example, in this example, at the time it's written,
master has 1 partition and is written into 3 buckets, resulting in 3
files being written out.  Later when I generated my transaction
dataset, I repartitioned it into 3 partitions.  Was that the correct
thing to do (3 transaction partitions == 3 master buckets)?  Or should
I have repartitioned master into 3 partitions before writing
(resulting in 9 files if I still create 3 buckets)?  Basically, I
don't understand how partitions and buckets should be handled.

So, I feel like I'm close, but there are a few ways in which I don't
understand how these pieces are supposed to fit together.  If this is
explained somewhere, with a simple example, that would be great.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to