Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22473#discussion_r219580690
--- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
---
@@ -252,18 +253,121 @@ class ExecutorSuite extends SparkFunSuite with
LocalSparkContext with MockitoSug
}
}
+ test("Heartbeat should drop zero metrics") {
+ heartbeatZeroMetricTest(true)
+ }
+
+ test("Heartbeat should not drop zero metrics when the conf is set to
false") {
+ heartbeatZeroMetricTest(false)
+ }
+
+ private def withHeartbeatExecutor(confs: (String, String)*)
+ (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
+ val conf = new SparkConf
+ confs.foreach { case (k, v) => conf.set(k, v) }
+ val serializer = new JavaSerializer(conf)
+ val env = createMockEnv(conf, serializer)
+ val executor =
+ new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil,
isLocal = true)
+ val executorClass = classOf[Executor]
+
+ // Set ExecutorMetricType.values to be a minimal set to avoid get null
exceptions
+ val metricClass =
+
Utils.classForName(classOf[org.apache.spark.metrics.ExecutorMetricType].getName()
+ "$")
+ val metricTypeValues = metricClass.getDeclaredField("values")
+ metricTypeValues.setAccessible(true)
+ metricTypeValues.set(
+ org.apache.spark.metrics.ExecutorMetricType,
+ IndexedSeq(JVMHeapMemory, JVMOffHeapMemory))
+
+ // Save all heartbeats sent into an ArrayBuffer for verification
+ val heartbeats = ArrayBuffer[Heartbeat]()
+ val mockReceiver = mock[RpcEndpointRef]
+ when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
+ .thenAnswer(new Answer[HeartbeatResponse] {
+ override def answer(invocation: InvocationOnMock):
HeartbeatResponse = {
+ val args = invocation.getArguments()
+ val mock = invocation.getMock
+ heartbeats += args(0).asInstanceOf[Heartbeat]
+ HeartbeatResponse(false)
+ }
+ })
+ val receiverRef =
executorClass.getDeclaredField("heartbeatReceiverRef")
+ receiverRef.setAccessible(true)
+ receiverRef.set(executor, mockReceiver)
+
+ f(executor, heartbeats)
+ }
+
+ private def invokeReportHeartbeat(executor: Executor): Unit = {
--- End diff --
You can mixin `org.scalatest.PrivateMethodTester` to replace this method,
such as
```
val reportHeartBeat = PrivateMethod[Long]('reportHeartBeat)
...
executor.invokePrivate(reportHeartBeat())
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]