This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b92d64d6ef0c [SPARK-46301][CORE] Support `spark.worker.(initial|max)RegistrationRetries` b92d64d6ef0c is described below commit b92d64d6ef0c99b6b444f41ebdfe95f3260312aa Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Thu Dec 7 01:01:56 2023 -0800 [SPARK-46301][CORE] Support `spark.worker.(initial|max)RegistrationRetries` ### What changes were proposed in this pull request? This PR aims to support `spark.worker.(initial|max)RegistrationRetries` to parameterize the hard-coded magic numbers. ```scala - private val INITIAL_REGISTRATION_RETRIES = 6 - private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10 + private val INITIAL_REGISTRATION_RETRIES = conf.get(WORKER_INITIAL_REGISTRATION_RETRIES) + private val TOTAL_REGISTRATION_RETRIES = conf.get(WORKER_MAX_REGISTRATION_RETRIES) ``` ### Why are the changes needed? To allow users to control these. ### Does this PR introduce _any_ user-facing change? No. The default values are consistent with the existing behavior. ### How was this patch tested? Pass the CIs. ![Screenshot 2023-12-06 at 8 58 05 PM](https://github.com/apache/spark/assets/9700541/985ff3f7-d8c9-4803-a207-a6c16388e4d0) ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44229 from dongjoon-hyun/SPARK-46301. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../scala/org/apache/spark/deploy/worker/Worker.scala | 14 ++++++++++---- .../org/apache/spark/internal/config/Worker.scala | 17 +++++++++++++++++ docs/spark-standalone.md | 18 ++++++++++++++++++ 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index eae12648b95a..1422a1484f8d 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -96,12 +96,18 @@ private[deploy] class Worker( private val HEARTBEAT_MILLIS = conf.get(WORKER_TIMEOUT) * 1000 / 4 // Model retries to connect to the master, after Hadoop's model. - // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds) - // Afterwards, the next 10 attempts are between 30 and 90 seconds. + // The total number of retries are less than or equal to WORKER_MAX_REGISTRATION_RETRIES. + // Within the upper limit, WORKER_MAX_REGISTRATION_RETRIES, + // the first WORKER_INITIAL_REGISTRATION_RETRIES attempts to reconnect are in shorter intervals + // (between 5 and 15 seconds). Afterwards, the next attempts are between 30 and 90 seconds while // A bit of randomness is introduced so that not all of the workers attempt to reconnect at // the same time. - private val INITIAL_REGISTRATION_RETRIES = 6 - private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10 + private val INITIAL_REGISTRATION_RETRIES = conf.get(WORKER_INITIAL_REGISTRATION_RETRIES) + private val TOTAL_REGISTRATION_RETRIES = conf.get(WORKER_MAX_REGISTRATION_RETRIES) + if (INITIAL_REGISTRATION_RETRIES > TOTAL_REGISTRATION_RETRIES) { + logInfo(s"${WORKER_INITIAL_REGISTRATION_RETRIES.key} ($INITIAL_REGISTRATION_RETRIES) is " + + s"capped by ${WORKER_MAX_REGISTRATION_RETRIES.key} ($TOTAL_REGISTRATION_RETRIES)") + } private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500 private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = { val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala index f160470edd8f..c53e181df002 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala @@ -37,6 +37,23 @@ private[spark] object Worker { .longConf .createWithDefault(60) + val WORKER_INITIAL_REGISTRATION_RETRIES = ConfigBuilder("spark.worker.initialRegistrationRetries") + .version("4.0.0") + .internal() + .doc("The number of retries to reconnect in short intervals (between 5 and 15 seconds).") + .intConf + .checkValue(_ > 0, "The number of initial registration retries should be positive") + .createWithDefault(6) + + val WORKER_MAX_REGISTRATION_RETRIES = ConfigBuilder("spark.worker.maxRegistrationRetries") + .version("4.0.0") + .internal() + .doc("The max number of retries to reconnect. After spark.worker.initialRegistrationRetries " + + "attempts, the interval is between 30 and 90 seconds.") + .intConf + .checkValue(_ > 0, "The max number of registration retries should be positive") + .createWithDefault(16) + val WORKER_DRIVER_TERMINATE_TIMEOUT = ConfigBuilder("spark.worker.driverTerminateTimeout") .version("2.1.2") .timeConf(TimeUnit.MILLISECONDS) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 25d2fba47ce1..56441e5b6dc2 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -388,6 +388,24 @@ SPARK_WORKER_OPTS supports the following system properties: <table> <thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead> +<tr> + <td><code>spark.worker.initialRegistrationRetries</code></td> + <td>6</td> + <td> + The number of retries to reconnect in short intervals (between 5 and 15 seconds). + </td> + <td>4.0.0</td> +</tr> +<tr> + <td><code>spark.worker.maxRegistrationRetries</code></td> + <td>16</td> + <td> + The max number of retries to reconnect. + After <code>spark.worker.initialRegistrationRetries</code> attempts, the interval is between + 30 and 90 seconds. + </td> + <td>4.0.0</td> +</tr> <tr> <td><code>spark.worker.cleanup.enabled</code></td> <td>false</td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org