ajithme commented on issue #24140: [SPARK-27198][core] Heartbeat interval mismatch in driver and executor URL: https://github.com/apache/spark/pull/24140#issuecomment-474493386 Here is a test snippet Apologies as i dint find a better way to probe heartbeat rate between driver and executor (so used reflection). Please suggest if there is a better way ``` package org.apache.spark import java.util.concurrent.ConcurrentMap import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef} import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} /** * A test for the heartbeat behavior between the driver and the executors. */ class HeartbeatExpirySuite extends SparkFunSuite with BeforeAndAfterEach with PrivateMethodTester with LocalSparkContext { test("heartbeat of 5 seconds : Driver must not receive heartbeats shorter than 5 seconds") { val heartbeatInterval = 5 // assuming this is 5 seconds without units val localExecID = "driver" // local exec id // driver assumes heartbeatInterval is 5 seconds try{ new SparkConf() .set("spark.network.timeout", (heartbeatInterval - 1).toString) .set("spark.executor.heartbeatInterval", heartbeatInterval.toString).validateSettings() assert(false) } catch { case e : IllegalArgumentException => e.getMessage.contains("spark.executor.heartbeatInterval=5s") } // executor assumes heartbeatInterval is 5 milliseconds val conf = new SparkConf() .setMaster("local[1]") .setAppName("test") .set("spark.executor.heartbeatInterval", heartbeatInterval.toString) sc = new SparkContext(conf) // now lets extract the last seen executor map from driver's heartbeat receiver val execLastSeen = getExecLastSeen // nap for a duration < heartbeat interval // take 3 readings to eliminate overlapping scenario val heartbeatReportedTime1 = execLastSeen.get(localExecID) Thread.sleep((heartbeatInterval * 1000) / 2) // wait for a duration shorter than heartbeat val heartbeatReportedTime2 = execLastSeen.get(localExecID) Thread.sleep((heartbeatInterval * 1000) / 2) // wait for a duration shorter than heartbeat val heartbeatReportedTime3 = execLastSeen.get(localExecID) assert(heartbeatReportedTime1 == heartbeatReportedTime2 || heartbeatReportedTime2 == heartbeatReportedTime3) } /** * get HeartbeatReceiver#executorLastSeen via reflection * * @return map of executor vs last heartbeat time */ private def getExecLastSeen = { val nettyRpcClass = Class.forName("org.apache.spark.rpc.netty.NettyRpcEnv") val dispatcherField = nettyRpcClass.getDeclaredField("dispatcher") dispatcherField.setAccessible(true) val dispatcher = dispatcherField.get(sc.env.rpcEnv) val dispatcherClass = Class.forName("org.apache.spark.rpc.netty.Dispatcher") val endpointRefsField = dispatcherClass.getDeclaredField("endpointRefs") endpointRefsField.setAccessible(true) val endpointRefs = endpointRefsField.get(dispatcher).asInstanceOf[ConcurrentMap[RpcEndpoint, RpcEndpointRef]] import scala.collection.JavaConverters._ val heartbeatReciver = endpointRefs.keySet().asScala.filter(_.isInstanceOf[HeartbeatReceiver]).head val _executorLastSeen = PrivateMethod[collection.Map[String, Long]]('executorLastSeen) heartbeatReciver.invokePrivate(_executorLastSeen()) } } ``` The test fails with ``` Some(1553017599197) did not equal Some(1553017601716), and Some(1553017601716) did not equal Some(1553017604240) ScalaTestFailureLocation: org.apache.spark.HeartbeatExpirySuite$$anonfun$1 at (HeartbeatExpirySuite.scala:68) ``` Here we can see that 1. driver validates heartbeat at rate of 1 per 5 Second 2. executor sends heartbeat at rate of 1 per 5 MilliSecond
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
