[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...
Github user sujithjay commented on a diff in the pull request: https://github.com/apache/spark/pull/22168#discussion_r236992453 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -1058,31 +1064,37 @@ private class SortMergeFullOuterJoinScanner( * @return true if a valid match is found, false otherwise. */ private def scanNextInBuffered(): Boolean = { -while (leftIndex < leftMatches.size) { - while (rightIndex < rightMatches.size) { -joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)) -if (boundCondition(joinedRow)) { - leftMatched.set(leftIndex) - rightMatched.set(rightIndex) +val leftMatchesIterator = leftMatches.generateIterator(leftIndex) + +while (leftMatchesIterator.hasNext) { + val leftCurRow = leftMatchesIterator.next() + val rightMatchesIterator = rightMatches.generateIterator(rightIndex) --- End diff -- Hi @viirya , After some deliberation, I figure it would not be possible to avoid the reinitialisation of the right iterator. Please share your thought on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...
Github user sujithjay commented on a diff in the pull request: https://github.com/apache/spark/pull/22168#discussion_r211695230 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -1058,31 +1064,37 @@ private class SortMergeFullOuterJoinScanner( * @return true if a valid match is found, false otherwise. */ private def scanNextInBuffered(): Boolean = { -while (leftIndex < leftMatches.size) { - while (rightIndex < rightMatches.size) { -joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)) -if (boundCondition(joinedRow)) { - leftMatched.set(leftIndex) - rightMatched.set(rightIndex) +val leftMatchesIterator = leftMatches.generateIterator(leftIndex) + +while (leftMatchesIterator.hasNext) { + val leftCurRow = leftMatchesIterator.next() + val rightMatchesIterator = rightMatches.generateIterator(rightIndex) --- End diff -- Hi @viirya, Thank you for reviewing the code. I agree the code will be a bit complicated if we had to keep scanning the iterators. In this particular case, I was following a pattern observed throughout the rest of the class. For eg., https://github.com/apache/spark/blob/35f7f5ce83984d8afe0b7955942baa04f2bef74f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L307-L329 Having said that, I do feel the penalty for following a similar approach in case of full outer joins could be higher. I will try & see what I can do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22168#discussion_r211619140 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -1058,31 +1064,37 @@ private class SortMergeFullOuterJoinScanner( * @return true if a valid match is found, false otherwise. */ private def scanNextInBuffered(): Boolean = { -while (leftIndex < leftMatches.size) { - while (rightIndex < rightMatches.size) { -joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)) -if (boundCondition(joinedRow)) { - leftMatched.set(leftIndex) - rightMatched.set(rightIndex) +val leftMatchesIterator = leftMatches.generateIterator(leftIndex) + +while (leftMatchesIterator.hasNext) { + val leftCurRow = leftMatchesIterator.next() + val rightMatchesIterator = rightMatches.generateIterator(rightIndex) --- End diff -- Can we keep the scanning left and right iterators? Because if they are spilled, obtaining the iterator from spilled data needs to loop over spill writers and create readers. We may avoid calling `generateIterator` every time for obtaining the iterators. However it might make the code a bit complicated than now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22168#discussion_r211607297 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -975,8 +979,10 @@ private class SortMergeFullOuterJoinScanner( private[this] var leftIndex: Int = 0 private[this] var rightIndex: Int = 0 - private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow] - private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow] --- End diff -- We can remove `scala.collection.mutable.ArrayBuffer` import now. Seems no other places use it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22168#discussion_r211609440 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -1028,23 +1034,23 @@ private class SortMergeFullOuterJoinScanner( rightIndex = 0 while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) { - leftMatches += leftRow.copy() + leftMatches.add(leftRow.copy().asInstanceOf[UnsafeRow]) --- End diff -- `ExternalAppendOnlyUnsafeRowArray` will handle copy, don't need to do another copy here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...
Github user sujithjay commented on a diff in the pull request: https://github.com/apache/spark/pull/22168#discussion_r211579406 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -1099,7 +,7 @@ private class SortMergeFullOuterJoinScanner( def advanceNext(): Boolean = { // If we already buffered some matching rows, use them directly -if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) { +if (leftIndex <= leftMatches.length || rightIndex <= rightMatches.length) { --- End diff -- I changed the type of `leftMatches` & `rightMatches` from `ArrayBuffer[InternalRow]` to `ExternalAppendOnlyUnsafeRowArray`. `ExternalAppendOnlyUnsafeRowArray` exposes a `length` method, instead of `size` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22168#discussion_r211577003 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -1099,7 +,7 @@ private class SortMergeFullOuterJoinScanner( def advanceNext(): Boolean = { // If we already buffered some matching rows, use them directly -if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) { +if (leftIndex <= leftMatches.length || rightIndex <= rightMatches.length) { --- End diff -- Why did you change size -> length? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...
GitHub user sujithjay opened a pull request: https://github.com/apache/spark/pull/22168 [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Join in case of data skew ## What issue does this pull request address ? JIRA: [https://issues.apache.org/jira/browse/SPARK-24985](https://issues.apache.org/jira/browse/SPARK-24985) In the case of Full Outer Joins of large tables, in the presence of data skew around the join keys for either of the joined tables, OOMs exceptions occur. While its possible to increase the heap size to workaround, Spark should be resilient to such issues as skews can happen arbitrarily. ## What changes were proposed in this pull request? #16909 introduced `ExternalAppendOnlyUnsafeRowArray` & changed `SortMergeJoinExec` to use `ExternalAppendOnlyUnsafeRowArray` for every join, except 'Full Outer Join'. This PR makes changes to make 'Full Outer Joins' to use `ExternalAppendOnlyUnsafeRowArray`. ## How was this patch tested? Unit testing - Changed a test-case in `JoinSuite`. - Existing tests in `OuterJoinSuite` were used to verify correctness. Stress testing - This is still work in progress. I plan to verify this patch using a production workload. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sujithjay/spark SPARK-24985 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22168.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22168 commit ce70a4ef4d6410d0a738a5440dd2b7d91c7e4822 Author: sujithjay Date: 2018-08-21T08:20:48Z [SPARK-24985][SQL] Fix OOM in Full Outer Join in presence of data skew. Change SortMergeJoinExec to use ExternalAppendOnlyUnsafeRowArray for Full Outer Join. This would spill data into disk if the buffered rows exceed beyond a threshold, thus preventing OOM errors. Change corresponding test case in JoinSuite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org