Hi, Spark Users:

I have a question related to the way I use the spark Dataset API for my case.

If the "ds_old" dataset is having 100 records, with 10 unique $"col1", and for 
the following pseudo-code:

val ds_new = ds_old.repartition(5, 
$"col1").sortWithinPartitions($"col2").mapPartitions(new MergeFuc)

class MergeFun extends MapPartitionsFunction[InputCaseClass, OutputCaseClass] {
  override def call(input: util.Iterator[InputCaseClass]): 
util.Iterator[OutputCaseClass] = {}
}


I have some questions related to "partition" defined in the above API, and 
below is my understanding:

1) repartition(5, $"col1") means distributing all 100 records based on 10 
unique col1 values to 5 partitions. There is no guarantee each of these 5 
partitions will have how many/which unique col1 value, but in a well-balanced 
hash algorithm, each partition will have close to the average count (10/5 = 2) 
for a large unique count of values.
2) sortWithPartitions($"col2) is one of the parts I want to clear out here. 
What is exactly the sortWithPartitions meaning here? I want the data sorted by 
"col2" within each unique value of "col1" here, but the Spark API uses the 
"partition" term so much in this case. I DON'T WANT the 100 records sorted 
within each of the 5 partitions, but within each unique of "col1". I believe 
this assumption is right, as we use "repartition" with "col1" first. Please 
confirm this.
3) mapPartitions(new MergeFuc) is another part I want to clear out. I 
originally assumed that my merge function will be called/invoked per unique 
col1 value (in this case we have 10 partitions). But after the test, I found 
out that indeed it is called ONCE per partition of the 5 partitions. So in this 
sense, the partition meaning in this API (mapPartitions) IS DIFFERENT as the 
partition meaning defined in "sortWithPartitions", correct? Or my understanding 
of "partition" in sortWithPartitions is also WRONG?

In summary, here are my questions:
1) We don't want to use "aggregation" API is due to that in my case, some 
unique value of "col1" COULD contain a big number of records, and sorting the 
data in a specified order per col1 helps our business case for the merge logic 
a lot.
2) We don't want to use "window" function, as the merge logic is indeed an 
aggregation logic. There will be only one record output as per grouping (col1). 
So even "window" function comes with sorting, but it doesn't fit in this case.
3) The unique value count of "col1" is unpredictable for spark, I understand 
that. But I wonder if there is an API that can be used to be called per 
grouping (per col1), instead of per partition (as defined as 5 partitions in 
this case).
4) If such API doesn't exist, and we have to use MapPartitionsFunction (The 
Iterator is much preferred here, as we don't need to worry OOM due to data 
skew), my following question is if Spark guarantees that the data comes within 
each partition is (col1, col2) order, in the API usage shown above? Or if Spark 
will delivery the data of each partition, sorted by "col2" for the first unique 
value of col1; then sorted by "col2" for the second unique value of col1, going 
forward, etc?
Another challenge is that if our merge function can expect the data in this 
order, but have to generate the output per grouping of col1, in an Iterator 
format, does Spark have an existing example to refer?

Thanks

Yong

Reply via email to