ajithme commented on issue #24140: [SPARK-27198][core] Heartbeat interval 
mismatch in driver and executor
URL: https://github.com/apache/spark/pull/24140#issuecomment-474493386
 
 
   Here is a test snippet
   Apologies as i dint find a better way to probe heartbeat rate between driver 
and executor (so used reflection). Please suggest if there is a better way
   
   ```
   package org.apache.spark
   
   import java.util.concurrent.ConcurrentMap
   
   import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef}
   import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
   
   /**
     * A test for the heartbeat behavior between the driver and the executors.
     */
   class HeartbeatExpirySuite
     extends SparkFunSuite
       with BeforeAndAfterEach
       with PrivateMethodTester
       with LocalSparkContext {
   
     test("heartbeat of 5 seconds : Driver must not receive heartbeats shorter 
than 5 seconds") {
   
       val heartbeatInterval = 5 // assuming this is 5 seconds without units
       val localExecID = "driver" // local exec id
   
       // driver assumes heartbeatInterval is 5 seconds
       try{
         new SparkConf()
           .set("spark.network.timeout", (heartbeatInterval - 1).toString)
           .set("spark.executor.heartbeatInterval", 
heartbeatInterval.toString).validateSettings()
         assert(false)
       } catch {
         case e : IllegalArgumentException => 
e.getMessage.contains("spark.executor.heartbeatInterval=5s")
       }
   
       // executor assumes heartbeatInterval is 5 milliseconds
   
       val conf = new SparkConf()
         .setMaster("local[1]")
         .setAppName("test")
         .set("spark.executor.heartbeatInterval", heartbeatInterval.toString)
       sc = new SparkContext(conf)
   
       // now lets extract the last seen executor map from driver's heartbeat 
receiver
       val execLastSeen = getExecLastSeen
   
       // nap for a duration < heartbeat interval
       // take 3 readings to eliminate overlapping scenario
       val heartbeatReportedTime1 = execLastSeen.get(localExecID)
       Thread.sleep((heartbeatInterval * 1000) / 2) // wait for a duration 
shorter than heartbeat
       val heartbeatReportedTime2 = execLastSeen.get(localExecID)
       Thread.sleep((heartbeatInterval * 1000) / 2) // wait for a duration 
shorter than heartbeat
       val heartbeatReportedTime3 = execLastSeen.get(localExecID)
   
       assert(heartbeatReportedTime1 == heartbeatReportedTime2 ||
         heartbeatReportedTime2 == heartbeatReportedTime3)
     }
   
     /**
       * get HeartbeatReceiver#executorLastSeen via reflection
       *
       * @return map of executor vs last heartbeat time
       */
     private def getExecLastSeen = {
       val nettyRpcClass = 
Class.forName("org.apache.spark.rpc.netty.NettyRpcEnv")
       val dispatcherField = nettyRpcClass.getDeclaredField("dispatcher")
       dispatcherField.setAccessible(true)
       val dispatcher = dispatcherField.get(sc.env.rpcEnv)
       val dispatcherClass = 
Class.forName("org.apache.spark.rpc.netty.Dispatcher")
       val endpointRefsField = dispatcherClass.getDeclaredField("endpointRefs")
       endpointRefsField.setAccessible(true)
       val endpointRefs = 
endpointRefsField.get(dispatcher).asInstanceOf[ConcurrentMap[RpcEndpoint, 
RpcEndpointRef]]
       import scala.collection.JavaConverters._
       val heartbeatReciver = 
endpointRefs.keySet().asScala.filter(_.isInstanceOf[HeartbeatReceiver]).head
       val _executorLastSeen = PrivateMethod[collection.Map[String, 
Long]]('executorLastSeen)
       heartbeatReciver.invokePrivate(_executorLastSeen())
     }
   }
   ```
   
   The test fails with
   ```
   Some(1553017599197) did not equal Some(1553017601716), and 
Some(1553017601716) did not equal Some(1553017604240)
   ScalaTestFailureLocation: org.apache.spark.HeartbeatExpirySuite$$anonfun$1 
at (HeartbeatExpirySuite.scala:68)
   ```
   
   Here we can see that 
   1. driver validates heartbeat at rate of 1 per 5 Second
   2. executor sends heartbeat at rate of 1 per 5 MilliSecond
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to