[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18843 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132621976 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1139,9 +1154,14 @@ class SQLConf extends Serializable with Logging { def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD) + def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD) + def sortMergeJoinExecBufferSpillThreshold: Int = getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD) + def sortMergeJoinExecBufferInMemoryThreshold: Int = --- End diff -- Sure. Since there was no in-memory buffer for cartesian product before, I am using a conservative value 4096 for the in-memory buffer threshold. However, the spill threshold is set to `UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` like it was before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132615896 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -844,24 +844,39 @@ object SQLConf { .stringConf .createWithDefaultFunction(() => TimeZone.getDefault.getID) + val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.windowExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the window operator") + .intConf + .createWithDefault(4096) + val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.windowExec.buffer.spill.threshold") .internal() - .doc("Threshold for number of rows buffered in window operator") + .doc("Threshold for number of rows to be spilled by window operator") .intConf - .createWithDefault(4096) + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) + + val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " + +"join operator") + .intConf + .createWithDefault(Int.MaxValue) --- End diff -- got it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132615780 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -844,24 +844,39 @@ object SQLConf { .stringConf .createWithDefaultFunction(() => TimeZone.getDefault.getID) + val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.windowExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the window operator") + .intConf + .createWithDefault(4096) + val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.windowExec.buffer.spill.threshold") .internal() - .doc("Threshold for number of rows buffered in window operator") + .doc("Threshold for number of rows to be spilled by window operator") .intConf - .createWithDefault(4096) + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) + + val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = --- End diff -- ok let's keep them separated for each operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132615759 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1139,9 +1154,14 @@ class SQLConf extends Serializable with Logging { def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD) + def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD) + def sortMergeJoinExecBufferSpillThreshold: Int = getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD) + def sortMergeJoinExecBufferInMemoryThreshold: Int = --- End diff -- shall we introduce a similar config for cartesian product? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132543304 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala --- @@ -31,16 +31,16 @@ import org.apache.spark.storage.BlockManager import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} /** - * An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined - * threshold of rows is reached. + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is + * excessive memory consumption). Setting these threshold involves following trade-offs: * - * Setting spill threshold faces following trade-off: - * - * - If the spill threshold is too high, the in-memory array may occupy more memory than is - * available, resulting in OOM. - * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. - * This may lead to a performance regression compared to the normal case of using an - * [[ArrayBuffer]] or [[Array]]. + * - If [[numRowsSpillThreshold]] is too high, the in-memory array may occupy more memory than --- End diff -- Yes it was a typo. Corrected it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132546198 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -844,24 +844,39 @@ object SQLConf { .stringConf .createWithDefaultFunction(() => TimeZone.getDefault.getID) + val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.windowExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the window operator") + .intConf + .createWithDefault(4096) + val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.windowExec.buffer.spill.threshold") .internal() - .doc("Threshold for number of rows buffered in window operator") + .doc("Threshold for number of rows to be spilled by window operator") .intConf - .createWithDefault(4096) + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) + + val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = --- End diff -- I am fine with that. We can even go a step further and just have two configs : in-mem threshold and spill threshold at the `ExternalAppendOnlyUnsafeRowArray` for all its clients (currently SMJ, cartesian product, Window). That way we have consistency across all clients and both knobs. One downside is backward compatibility : spill threshold was already defined per operator level and people might be using it in prod. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132570353 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -844,24 +844,39 @@ object SQLConf { .stringConf .createWithDefaultFunction(() => TimeZone.getDefault.getID) + val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.windowExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the window operator") + .intConf + .createWithDefault(4096) + val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.windowExec.buffer.spill.threshold") .internal() - .doc("Threshold for number of rows buffered in window operator") + .doc("Threshold for number of rows to be spilled by window operator") .intConf - .createWithDefault(4096) + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) + + val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " + +"join operator") + .intConf + .createWithDefault(Int.MaxValue) --- End diff -- Before introducing `ExternalAppendOnlyUnsafeRowArray`, SMJ used to hold in-memory data in scala's `ArrayBuffer`. Its backed by an array which would at max be `Int.MaxValue` in size... so this default is keeping things as they were before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132156490 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -844,24 +844,39 @@ object SQLConf { .stringConf .createWithDefaultFunction(() => TimeZone.getDefault.getID) + val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.windowExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the window operator") + .intConf + .createWithDefault(4096) + val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.windowExec.buffer.spill.threshold") .internal() - .doc("Threshold for number of rows buffered in window operator") + .doc("Threshold for number of rows to be spilled by window operator") .intConf - .createWithDefault(4096) + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) + + val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " + +"join operator") + .intConf + .createWithDefault(Int.MaxValue) --- End diff -- It is the current value. I suppose you want to be able to tune it if you have to. Not all of us are running Spark at FB scale :)... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132148869 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -844,24 +844,39 @@ object SQLConf { .stringConf .createWithDefaultFunction(() => TimeZone.getDefault.getID) + val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.windowExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the window operator") + .intConf + .createWithDefault(4096) + val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.windowExec.buffer.spill.threshold") .internal() - .doc("Threshold for number of rows buffered in window operator") + .doc("Threshold for number of rows to be spilled by window operator") .intConf - .createWithDefault(4096) + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) + + val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " + +"join operator") + .intConf + .createWithDefault(Int.MaxValue) --- End diff -- is this a reasonable default value? won't it lead to OOM according to the document? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132148739 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -844,24 +844,39 @@ object SQLConf { .stringConf .createWithDefaultFunction(() => TimeZone.getDefault.getID) + val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD = +buildConf("spark.sql.windowExec.buffer.in.memory.threshold") + .internal() + .doc("Threshold for number of rows guaranteed to be held in memory by the window operator") + .intConf + .createWithDefault(4096) + val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.windowExec.buffer.spill.threshold") .internal() - .doc("Threshold for number of rows buffered in window operator") + .doc("Threshold for number of rows to be spilled by window operator") .intConf - .createWithDefault(4096) + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) + + val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = --- End diff -- can we just have one config for both window and SMJ? ideally we can say this config is for `ExternalAppendOnlyUnsafeRowArray` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18843#discussion_r132146376 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala --- @@ -31,16 +31,16 @@ import org.apache.spark.storage.BlockManager import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} /** - * An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined - * threshold of rows is reached. + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is + * excessive memory consumption). Setting these threshold involves following trade-offs: * - * Setting spill threshold faces following trade-off: - * - * - If the spill threshold is too high, the in-memory array may occupy more memory than is - * available, resulting in OOM. - * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. - * This may lead to a performance regression compared to the normal case of using an - * [[ArrayBuffer]] or [[Array]]. + * - If [[numRowsSpillThreshold]] is too high, the in-memory array may occupy more memory than --- End diff -- typo? this should be `numRowsInMemoryBufferThreshold`. We may spill before reaching `numRowsSpillThreshold` if memory is not enough. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18843: [SPARK-21595] Separate thresholds for buffering a...
GitHub user tejasapatil opened a pull request: https://github.com/apache/spark/pull/18843 [SPARK-21595] Separate thresholds for buffering and spilling in ExternalAppendOnlyUnsafeRowArray ## What changes were proposed in this pull request? [SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre https://github.com/apache/spark/pull/16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers). Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control. ## How was this patch tested? Added unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/tejasapatil/spark SPARK-21595 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18843.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18843 commit 8e3bfb7715e366a64da6add80253373af7d07915 Author: Tejas PatilDate: 2017-08-04T00:53:03Z Separate thresholds for buffering and spilling --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org