Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
zhengruifeng commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1677328844 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.master.startsWith("local")) Some(SparkContext.numDriverCores(sc.master)) else None Review Comment: shall we use existing methods like `sc.isLocal` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
LuciferYang commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1677336255 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.master.startsWith("local")) Some(SparkContext.numDriverCores(sc.master)) else None Review Comment: This looks like it will break the `local-cluster` mode. With this change, `SHUFFLE_PARTITIONS` seems to be configured to 0 in `local-cluster` mode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
HyukjinKwon commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1677341101 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.master.startsWith("local")) Some(SparkContext.numDriverCores(sc.master)) else None Review Comment: oh yeah -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
ueshin commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1678189190 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None + } + val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions") .doc("The default number of partitions to use when shuffling data for joins or aggregations. " + "Note: For structured streaming, this configuration cannot be changed between query " + "restarts from the same checkpoint location.") .version("1.1.0") .intConf .checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive") -.createWithDefault(200) +.createWithDefaultFunction(() => defaultShufflePartition.getOrElse(200)) Review Comment: Will this change also affect Spark Connect local mode? E.g., ``` $ ./bin/pyspark --remote local ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
HyukjinKwon commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1678192800 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None + } + val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions") .doc("The default number of partitions to use when shuffling data for joins or aggregations. " + "Note: For structured streaming, this configuration cannot be changed between query " + "restarts from the same checkpoint location.") .version("1.1.0") .intConf .checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive") -.createWithDefault(200) +.createWithDefaultFunction(() => defaultShufflePartition.getOrElse(200)) Review Comment: Yes it does -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
szehon-ho commented on PR #47349: URL: https://github.com/apache/spark/pull/47349#issuecomment-2229749053 I saw this before, it seems a good change to have a better experience running spark locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
pan3793 commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1678971316 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None Review Comment: maybe numDriverCores * 2 or 3? in production practice, we usually expect a larger(2 or 3 times of available exec cores) parallelism of the stage to tackle uneven task runtimes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
HyukjinKwon commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1679158703 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None Review Comment: This is only for `local[...]` so I think more common cases are just trying small data out in their local file system -well maybe this isn't the case also. I will need to run some benchmark though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
HyukjinKwon commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1679158703 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None Review Comment: This is only for `local[...]` so I think more common cases are just trying small data out in their local file system -well maybe this isn't the case also. I will need to run some benchmark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org