[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

2018-11-28 Thread sujithjay
Github user sujithjay commented on a diff in the pull request:

https://github.com/apache/spark/pull/22168#discussion_r236992453
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -1058,31 +1064,37 @@ private class SortMergeFullOuterJoinScanner(
* @return true if a valid match is found, false otherwise.
*/
   private def scanNextInBuffered(): Boolean = {
-while (leftIndex < leftMatches.size) {
-  while (rightIndex < rightMatches.size) {
-joinedRow(leftMatches(leftIndex), rightMatches(rightIndex))
-if (boundCondition(joinedRow)) {
-  leftMatched.set(leftIndex)
-  rightMatched.set(rightIndex)
+val leftMatchesIterator = leftMatches.generateIterator(leftIndex)
+
+while (leftMatchesIterator.hasNext) {
+  val leftCurRow = leftMatchesIterator.next()
+  val rightMatchesIterator = rightMatches.generateIterator(rightIndex)
--- End diff --

Hi @viirya ,
After some deliberation, I figure it would not be possible to avoid the 
reinitialisation of the right iterator. Please share your thought on this. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

2018-08-21 Thread sujithjay
Github user sujithjay commented on a diff in the pull request:

https://github.com/apache/spark/pull/22168#discussion_r211695230
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -1058,31 +1064,37 @@ private class SortMergeFullOuterJoinScanner(
* @return true if a valid match is found, false otherwise.
*/
   private def scanNextInBuffered(): Boolean = {
-while (leftIndex < leftMatches.size) {
-  while (rightIndex < rightMatches.size) {
-joinedRow(leftMatches(leftIndex), rightMatches(rightIndex))
-if (boundCondition(joinedRow)) {
-  leftMatched.set(leftIndex)
-  rightMatched.set(rightIndex)
+val leftMatchesIterator = leftMatches.generateIterator(leftIndex)
+
+while (leftMatchesIterator.hasNext) {
+  val leftCurRow = leftMatchesIterator.next()
+  val rightMatchesIterator = rightMatches.generateIterator(rightIndex)
--- End diff --

Hi @viirya, 
Thank you for reviewing the code. 

I agree the code will be a bit complicated if we had to keep scanning the 
iterators. In this particular case, I was following a pattern observed 
throughout the rest of the class.
For eg., 
https://github.com/apache/spark/blob/35f7f5ce83984d8afe0b7955942baa04f2bef74f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L307-L329

Having said that, I do feel the penalty for following a similar approach in 
case of full outer joins could be higher. I will try & see what I can do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

2018-08-21 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22168#discussion_r211619140
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -1058,31 +1064,37 @@ private class SortMergeFullOuterJoinScanner(
* @return true if a valid match is found, false otherwise.
*/
   private def scanNextInBuffered(): Boolean = {
-while (leftIndex < leftMatches.size) {
-  while (rightIndex < rightMatches.size) {
-joinedRow(leftMatches(leftIndex), rightMatches(rightIndex))
-if (boundCondition(joinedRow)) {
-  leftMatched.set(leftIndex)
-  rightMatched.set(rightIndex)
+val leftMatchesIterator = leftMatches.generateIterator(leftIndex)
+
+while (leftMatchesIterator.hasNext) {
+  val leftCurRow = leftMatchesIterator.next()
+  val rightMatchesIterator = rightMatches.generateIterator(rightIndex)
--- End diff --

Can we keep the scanning left and right iterators? Because if they are 
spilled, obtaining the iterator from spilled data needs to loop over spill 
writers and create readers. We may avoid calling `generateIterator` every time 
for obtaining the iterators. However it might make the code a bit complicated 
than now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

2018-08-21 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22168#discussion_r211607297
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -975,8 +979,10 @@ private class SortMergeFullOuterJoinScanner(
 
   private[this] var leftIndex: Int = 0
   private[this] var rightIndex: Int = 0
-  private[this] val leftMatches: ArrayBuffer[InternalRow] = new 
ArrayBuffer[InternalRow]
-  private[this] val rightMatches: ArrayBuffer[InternalRow] = new 
ArrayBuffer[InternalRow]
--- End diff --

We can remove `scala.collection.mutable.ArrayBuffer` import now. Seems no 
other places use it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

2018-08-21 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22168#discussion_r211609440
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -1028,23 +1034,23 @@ private class SortMergeFullOuterJoinScanner(
 rightIndex = 0
 
 while (leftRowKey != null && keyOrdering.compare(leftRowKey, 
matchingKey) == 0) {
-  leftMatches += leftRow.copy()
+  leftMatches.add(leftRow.copy().asInstanceOf[UnsafeRow])
--- End diff --

`ExternalAppendOnlyUnsafeRowArray` will handle copy, don't need to do 
another copy here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

2018-08-21 Thread sujithjay
Github user sujithjay commented on a diff in the pull request:

https://github.com/apache/spark/pull/22168#discussion_r211579406
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -1099,7 +,7 @@ private class SortMergeFullOuterJoinScanner(
 
   def advanceNext(): Boolean = {
 // If we already buffered some matching rows, use them directly
-if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) {
+if (leftIndex <= leftMatches.length || rightIndex <= 
rightMatches.length) {
--- End diff --

I changed the type of `leftMatches` & `rightMatches` from 
`ArrayBuffer[InternalRow]` to `ExternalAppendOnlyUnsafeRowArray`. 
`ExternalAppendOnlyUnsafeRowArray` exposes a `length` method, instead of `size` 
method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

2018-08-21 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22168#discussion_r211577003
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -1099,7 +,7 @@ private class SortMergeFullOuterJoinScanner(
 
   def advanceNext(): Boolean = {
 // If we already buffered some matching rows, use them directly
-if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) {
+if (leftIndex <= leftMatches.length || rightIndex <= 
rightMatches.length) {
--- End diff --

Why did you change size -> length?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

2018-08-21 Thread sujithjay
GitHub user sujithjay opened a pull request:

https://github.com/apache/spark/pull/22168

[SPARK-24985][SQL][WIP] Fix OOM in Full Outer Join in case of data skew

## What issue does this pull request address ?
JIRA: 
[https://issues.apache.org/jira/browse/SPARK-24985](https://issues.apache.org/jira/browse/SPARK-24985)
In the case of Full Outer Joins of large tables, in the presence of data 
skew around the join keys for either of the joined tables, OOMs exceptions 
occur. While its possible to increase the heap size to workaround, Spark should 
be resilient to such issues as skews can happen arbitrarily.

## What changes were proposed in this pull request?

#16909 introduced `ExternalAppendOnlyUnsafeRowArray` & changed 
`SortMergeJoinExec` to use `ExternalAppendOnlyUnsafeRowArray` for every join, 
except 'Full Outer Join'. This PR makes changes to make 'Full Outer Joins' to 
use `ExternalAppendOnlyUnsafeRowArray`.

## How was this patch tested?
 Unit testing
- Changed a test-case in `JoinSuite`.
- Existing tests in `OuterJoinSuite` were used to verify correctness.

 Stress testing
- This is still work in progress. I plan to verify this patch using a 
production workload.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sujithjay/spark SPARK-24985

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22168


commit ce70a4ef4d6410d0a738a5440dd2b7d91c7e4822
Author: sujithjay 
Date:   2018-08-21T08:20:48Z

[SPARK-24985][SQL] Fix OOM in Full Outer Join in presence of data skew.

Change SortMergeJoinExec to use ExternalAppendOnlyUnsafeRowArray for Full 
Outer Join. This would spill data into disk if the buffered rows exceed beyond 
a threshold, thus preventing OOM errors.

Change corresponding test case in JoinSuite.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org