Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219580690 --- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala --- @@ -252,18 +253,121 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } } + test("Heartbeat should drop zero metrics") { + heartbeatZeroMetricTest(true) + } + + test("Heartbeat should not drop zero metrics when the conf is set to false") { + heartbeatZeroMetricTest(false) + } + + private def withHeartbeatExecutor(confs: (String, String)*) + (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { + val conf = new SparkConf + confs.foreach { case (k, v) => conf.set(k, v) } + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + val executor = + new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true) + val executorClass = classOf[Executor] + + // Set ExecutorMetricType.values to be a minimal set to avoid get null exceptions + val metricClass = + Utils.classForName(classOf[org.apache.spark.metrics.ExecutorMetricType].getName() + "$") + val metricTypeValues = metricClass.getDeclaredField("values") + metricTypeValues.setAccessible(true) + metricTypeValues.set( + org.apache.spark.metrics.ExecutorMetricType, + IndexedSeq(JVMHeapMemory, JVMOffHeapMemory)) + + // Save all heartbeats sent into an ArrayBuffer for verification + val heartbeats = ArrayBuffer[Heartbeat]() + val mockReceiver = mock[RpcEndpointRef] + when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any)) + .thenAnswer(new Answer[HeartbeatResponse] { + override def answer(invocation: InvocationOnMock): HeartbeatResponse = { + val args = invocation.getArguments() + val mock = invocation.getMock + heartbeats += args(0).asInstanceOf[Heartbeat] + HeartbeatResponse(false) + } + }) + val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") + receiverRef.setAccessible(true) + receiverRef.set(executor, mockReceiver) + + f(executor, heartbeats) + } + + private def invokeReportHeartbeat(executor: Executor): Unit = { --- End diff -- You can mixin `org.scalatest.PrivateMethodTester` to replace this method, such as ``` val reportHeartBeat = PrivateMethod[Long]('reportHeartBeat) ... executor.invokePrivate(reportHeartBeat()) ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org