This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bdee74b4345 [SPARK-44726][CORE] Improve `HeartbeatReceiver` config 
validation error message
bdee74b4345 is described below

commit bdee74b43451cb684d72a5829e064f676f58aed1
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Tue Aug 8 22:58:22 2023 -0700

    [SPARK-44726][CORE] Improve `HeartbeatReceiver` config validation error 
message
    
    ### What changes were proposed in this pull request?
    
    This PR aims to improve `HeartbeatReceiver` to give a clear directional 
message for Apache Spark 4.0.
    
    ### Why are the changes needed?
    
    Currently, when we set improper `spark.network.timeout` value, the error 
message is misleading because it complains about the relationship between 
`spark.network.timeoutInterval` and 
`spark.storage.blockManagerHeartbeatTimeoutMs` which the users never have in 
their Spark jobs.
    
    ```
    $ bin/spark-shell -c spark.network.timeout=30s
    ...
    java.lang.IllegalArgumentException: requirement failed:
    spark.network.timeoutInterval should be less than or equal to
    spark.storage.blockManagerHeartbeatTimeoutMs.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This PR gives more direct messages based on the users' configuration.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test cases.
    
    Closes #42403 from dongjoon-hyun/SPARK-44726.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../scala/org/apache/spark/HeartbeatReceiver.scala | 12 ++++++---
 .../org/apache/spark/HeartbeatReceiverSuite.scala  | 29 +++++++++++++++++++++-
 2 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 825d9ce7794..5999040894a 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -88,9 +88,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 
   private val executorHeartbeatIntervalMs = 
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
 
-  require(checkTimeoutIntervalMs <= executorTimeoutMs,
-    s"${Network.NETWORK_TIMEOUT_INTERVAL.key} should be less than or " +
-      s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}.")
+  if (sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT).isEmpty) {
+    require(checkTimeoutIntervalMs <= executorTimeoutMs,
+      s"${Network.NETWORK_TIMEOUT_INTERVAL.key} should be less than or " +
+        s"equal to ${Network.NETWORK_TIMEOUT.key}.")
+  } else {
+    require(checkTimeoutIntervalMs <= executorTimeoutMs,
+      s"${Network.NETWORK_TIMEOUT_INTERVAL.key} should be less than or " +
+        s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}.")
+  }
   require(executorHeartbeatIntervalMs <= executorTimeoutMs,
     s"${config.EXECUTOR_HEARTBEAT_INTERVAL.key} should be less than or " +
       s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}")
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index ee0a5773692..a8351322e01 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -29,7 +29,8 @@ import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
-import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
+import org.apache.spark.internal.config.{DYN_ALLOCATION_TESTING, 
STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT}
+import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
 import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, 
RpcEnv}
 import org.apache.spark.scheduler._
@@ -237,6 +238,32 @@ class HeartbeatReceiverSuite
     }
   }
 
+  test("SPARK-44726: Show spark.network.timeout config error message") {
+    sc.stop()
+    val conf = new SparkConf()
+      .setMaster("local[2]")
+      .setAppName("test")
+      .set(NETWORK_TIMEOUT.key, "30s")
+    val m = intercept[IllegalArgumentException] {
+      new SparkContext(conf)
+    }.getMessage
+    assert(m.contains("spark.network.timeoutInterval should be less than or 
equal to " +
+      NETWORK_TIMEOUT.key))
+  }
+
+  test("SPARK-44726: Show spark.storage.blockManagerHeartbeatTimeoutMs error 
message") {
+    sc.stop()
+    val conf = new SparkConf()
+      .setMaster("local[2]")
+      .setAppName("test")
+      .set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "30s")
+    val m = intercept[IllegalArgumentException] {
+      new SparkContext(conf)
+    }.getMessage
+    assert(m.contains("spark.network.timeoutInterval should be less than or 
equal to " +
+      STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key))
+  }
+
   /** Manually send a heartbeat and return the response. */
   private def triggerHeartbeat(
       executorId: String,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to