GitHub user tejasapatil opened a pull request:
https://github.com/apache/spark/pull/16909
[SPARK-13450] Introduce UnsafeRowExternalArray. Change SortMergeJoin and
WindowExec to use it
## What issue does this PR address ?
Jira: https://issues.apache.org/jira/browse/SPARK-13450
In `SortMergeJoinExec`, rows of the right relation having the same value
for a join key are buffered in-memory. In case of skew, this causes OOMs (see
comments in SPARK-13450 for more details). Heap dump from a failed job confirms
this :
https://issues.apache.org/jira/secure/attachment/12846382/heap-dump-analysis.png
. While its possible to increase the heap size to workaround, Spark should be
resilient to such issues as skews can happen arbitrarily.
## Change proposed in this pull request
- Introduces `ExternalAppendOnlyUnsafeRowArray`
- It holds `UnsafeRow`s in-memory upto a certain threshold.
- After the threshold is hit, it switches to `UnsafeExternalSorter` which
enables spilling of the rows to disk. It does NOT sort the data.
- Allows iterating the array multiple times. However, any alteration to
the array (using `add` or `clear`) will invalidate the existing iterator(s)
- `WindowExec` was already using `UnsafeExternalSorter` to support
spilling. Changed it to use the new array
- Changed `SortMergeJoinExec` to use the new array implementation
- NOTE: I have not changed FULL OUTER JOIN to use this new array
implementation. Changing that will need more surgery and I will rather put up a
separate PR for that once this gets in.
Note for reviewers: The diff can be divided into 3 (or more) parts. My
motive behind having all the changes in a single PR was to demonstrate that the
API is sane and supports 2 use cases. If reviewing the whole thing as 3
separate PRs would help, I am happy to make the spilt.
## How was this patch tested ?
#### Unit testing
- Added unit tests `ExternalAppendOnlyUnsafeRowArray` to validate all its
APIs and access patterns
- Added unit test for `SortMergeExec`
- with and without spill for inner join, left outer join, right outer join
to confirm that the spill threshold config behaves as expected and output is as
expected.
- This PR touches the scanning logic in `SortMergeExec` for _all_ joins
(except FULL OUTER JOIN). However, I expect existing test cases to cover that
there is no regression in correctness.
- Added unit test for `WindowExec` to check behavior of spilling and
correctness of results.
#### Stress testing
- Confirmed that OOM is gone by running against a production job which used
to OOM
- Since I cannot share details about prod workload externally, created
synthetic data to mimic to issue. Ran before and after the fix to demonstrate
the issue and query success with this PR
Generating the synthetic data
```
./bin/spark-shell --driver-memory=6G
import org.apache.spark.sql._
val hc = SparkSession.builder.master("local").getOrCreate()
hc.sql("DROP TABLE IF EXISTS spark_13450_large_table").collect
hc.sql("DROP TABLE IF EXISTS spark_13450_one_row_table").collect
val df1 = (0 until 1).map(i => ("10", "100", i.toString, (i *
2).toString)).toDF("i", "j", "str1", "str2")
df1.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100,
"i", "j").sortBy("i", "j").saveAsTable("spark_13450_one_row_table")
val df2 = (0 until 3000000).map(i => ("10", "100", i.toString, (i *
2).toString)).toDF("i", "j", "str1", "str2")
df2.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100,
"i", "j").sortBy("i", "j").saveAsTable("spark_13450_large_table")
```
Ran this against trunk VS local build with this PR. OOM repros with trunk
and with the fix this query runs fine.
```
./bin/spark-shell --driver-java-options="-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/spark.driver.heapdump.hprof"
import org.apache.spark.sql._
val hc = SparkSession.builder.master("local").getOrCreate()
hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1")
hc.sql("SET spark.sql.sortMergeJoinExec.buffer.spill.threshold=10000")
hc.sql("DROP TABLE IF EXISTS spark_13450_result").collect
hc.sql("""
CREATE TABLE spark_13450_result
AS
SELECT
a.i AS a_i, a.j AS a_j, a.str1 AS a_str1, a.str2 AS a_str2,
b.i AS b_i, b.j AS b_j, b.str1 AS b_str1, b.str2 AS b_str2
FROM
spark_13450_one_row_table a
JOIN
spark_13450_large_table b
ON
a.i=b.i AND
a.j=b.j
""")
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tejasapatil/spark SPARK-13450_smb_buffer_oom
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/16909.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #16909
----
commit e9cdd30252bce12d34f52cc31f95adb271ef2209
Author: Tejas Patil <[email protected]>
Date: 2017-02-04T02:14:33Z
[SPARK-13450] Introduce UnsafeRowExternalArray. Make SortMergeJoin and
WindowExec use it
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]