gengliangwang commented on code in PR #39192:
URL: https://github.com/apache/spark/pull/39192#discussion_r1059196954
##########
core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala:
##########
@@ -773,4 +740,568 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
assert(result.info.processLogs(k) == input.info.processLogs(k))
}
}
+
+ test("Stage Data") {
+ val accumulatorUpdates = Seq(
+ new AccumulableInfo(1L, "duration", Some("update"), "value1"),
+ new AccumulableInfo(2L, "duration2", None, "value2")
+ )
+ val inputMetrics = new InputMetrics(
+ bytesRead = 1L,
+ recordsRead = 2L)
+ val outputMetrics = new OutputMetrics(
+ bytesWritten = 1L,
+ recordsWritten = 2L
+ )
+ val shuffleReadMetrics = new ShuffleReadMetrics(
+ remoteBlocksFetched = 1L,
+ localBlocksFetched = 2L,
+ fetchWaitTime = 3L,
+ remoteBytesRead = 4L,
+ remoteBytesReadToDisk = 5L,
+ localBytesRead = 6L,
+ recordsRead = 7L
+ )
+ val shuffleWriteMetrics = new ShuffleWriteMetrics(
+ bytesWritten = 1L,
+ writeTime = 2L,
+ recordsWritten = 3L
+ )
+ val taskMetrics = new TaskMetrics(
+ executorDeserializeTime = 1L,
+ executorDeserializeCpuTime = 2L,
+ executorRunTime = 3L,
+ executorCpuTime = 4L,
+ resultSize = 5L,
+ jvmGcTime = 6L,
+ resultSerializationTime = 7L,
+ memoryBytesSpilled = 8L,
+ diskBytesSpilled = 9L,
+ peakExecutionMemory = 10L,
+ inputMetrics = inputMetrics,
+ outputMetrics = outputMetrics,
+ shuffleReadMetrics = shuffleReadMetrics,
+ shuffleWriteMetrics = shuffleWriteMetrics
+ )
+ val taskData1 = new TaskData(
+ taskId = 1L,
+ index = 2,
+ attempt = 3,
+ partitionId = 4,
+ launchTime = new Date(123456L),
+ resultFetchStart = Some(new Date(223456L)),
+ duration = Some(10000L),
+ executorId = "executor_id_1",
+ host = "host_name_1",
+ status = "SUCCESS",
+ taskLocality = "LOCAL",
+ speculative = true,
+ accumulatorUpdates = accumulatorUpdates,
+ errorMessage = Some("error_1"),
+ taskMetrics = Some(taskMetrics),
+ executorLogs = Map("executor_id_1" -> "executor_log_1"),
+ schedulerDelay = 5L,
+ gettingResultTime = 6L
+ )
+ val taskData2 = new TaskData(
+ taskId = 11L,
+ index = 12,
+ attempt = 13,
+ partitionId = 14,
+ launchTime = new Date(1123456L),
+ resultFetchStart = Some(new Date(1223456L)),
+ duration = Some(110000L),
+ executorId = "executor_id_2",
+ host = "host_name_2",
+ status = "SUCCESS",
+ taskLocality = "LOCAL",
+ speculative = false,
+ accumulatorUpdates = accumulatorUpdates,
+ errorMessage = Some("error_2"),
+ taskMetrics = Some(taskMetrics),
+ executorLogs = Map("executor_id_2" -> "executor_log_2"),
+ schedulerDelay = 15L,
+ gettingResultTime = 16L
+ )
+ val tasks = Some(
+ Map(1L -> taskData1, 2L -> taskData2)
+ )
+ val peakMemoryMetrics =
+ Some(new ExecutorMetrics(Array(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L,
1024L)))
+ val executorStageSummary1 = new ExecutorStageSummary(
+ taskTime = 1L,
+ failedTasks = 2,
+ succeededTasks = 3,
+ killedTasks = 4,
+ inputBytes = 5L,
+ inputRecords = 6L,
+ outputBytes = 7L,
+ outputRecords = 8L,
+ shuffleRead = 9L,
+ shuffleReadRecords = 10L,
+ shuffleWrite = 11L,
+ shuffleWriteRecords = 12L,
+ memoryBytesSpilled = 13L,
+ diskBytesSpilled = 14L,
+ isBlacklistedForStage = true,
+ peakMemoryMetrics = peakMemoryMetrics,
+ isExcludedForStage = false)
+ val executorStageSummary2 = new ExecutorStageSummary(
+ taskTime = 11L,
+ failedTasks = 12,
+ succeededTasks = 13,
+ killedTasks = 14,
+ inputBytes = 15L,
+ inputRecords = 16L,
+ outputBytes = 17L,
+ outputRecords = 18L,
+ shuffleRead = 19L,
+ shuffleReadRecords = 110L,
+ shuffleWrite = 111L,
+ shuffleWriteRecords = 112L,
+ memoryBytesSpilled = 113L,
+ diskBytesSpilled = 114L,
+ isBlacklistedForStage = false,
+ peakMemoryMetrics = peakMemoryMetrics,
+ isExcludedForStage = true)
+ val executorSummary = Some(
+ Map("executor_id_1" -> executorStageSummary1, "executor_id_2" ->
executorStageSummary2)
+ )
+ val speculationStageSummary = new SpeculationStageSummary(
+ numTasks = 3,
+ numActiveTasks = 4,
+ numCompletedTasks = 5,
+ numFailedTasks = 6,
+ numKilledTasks = 7
+ )
+ val inputMetricDistributions = new InputMetricDistributions(
+ bytesRead = IndexedSeq(1.001D, 2.001D),
+ recordsRead = IndexedSeq(3.001D, 4.001D)
+ )
+ val outputMetricDistributions = new OutputMetricDistributions(
+ bytesWritten = IndexedSeq(1.001D, 2.001D),
+ recordsWritten = IndexedSeq(3.001D, 4.001D)
+ )
+ val shuffleReadMetricDistributions = new ShuffleReadMetricDistributions(
+ readBytes = IndexedSeq(1.001D, 2.001D),
+ readRecords = IndexedSeq(3.001D, 4.001D),
+ remoteBlocksFetched = IndexedSeq(5.001D, 6.001D),
+ localBlocksFetched = IndexedSeq(7.001D, 8.001D),
+ fetchWaitTime = IndexedSeq(9.001D, 10.001D),
+ remoteBytesRead = IndexedSeq(11.001D, 12.001D),
+ remoteBytesReadToDisk = IndexedSeq(13.001D, 14.001D),
+ totalBlocksFetched = IndexedSeq(15.001D, 16.001D)
+ )
+ val shuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
+ writeBytes = IndexedSeq(1.001D, 2.001D),
+ writeRecords = IndexedSeq(3.001D, 4.001D),
+ writeTime = IndexedSeq(5.001D, 6.001D)
+ )
+ val taskMetricDistributions = new TaskMetricDistributions(
+ quantiles = IndexedSeq(1.001D, 2.001D),
+ duration = IndexedSeq(3.001D, 4.001D),
+ executorDeserializeTime = IndexedSeq(5.001D, 6.001D),
+ executorDeserializeCpuTime = IndexedSeq(7.001D, 8.001D),
+ executorRunTime = IndexedSeq(9.001D, 10.001D),
+ executorCpuTime = IndexedSeq(11.001D, 12.001D),
+ resultSize = IndexedSeq(13.001D, 14.001D),
+ jvmGcTime = IndexedSeq(15.001D, 16.001D),
+ resultSerializationTime = IndexedSeq(17.001D, 18.001D),
+ gettingResultTime = IndexedSeq(19.001D, 20.001D),
+ schedulerDelay = IndexedSeq(21.001D, 22.001D),
+ peakExecutionMemory = IndexedSeq(23.001D, 24.001D),
+ memoryBytesSpilled = IndexedSeq(25.001D, 26.001D),
+ diskBytesSpilled = IndexedSeq(27.001D, 28.001D),
+ inputMetrics = inputMetricDistributions,
+ outputMetrics = outputMetricDistributions,
+ shuffleReadMetrics = shuffleReadMetricDistributions,
+ shuffleWriteMetrics = shuffleWriteMetricDistributions
+ )
+ val executorPeakMetricsDistributions = new
ExecutorPeakMetricsDistributions(
+ quantiles = IndexedSeq(1.001D, 2.001D),
+ executorMetrics = IndexedSeq(
+ new ExecutorMetrics(Array(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 1024L)))
+ )
+ val executorMetricsDistributions = new ExecutorMetricsDistributions(
+ quantiles = IndexedSeq(1.001D, 2.001D),
+ taskTime = IndexedSeq(3.001D, 4.001D),
+ failedTasks = IndexedSeq(5.001D, 6.001D),
+ succeededTasks = IndexedSeq(7.001D, 8.001D),
+ killedTasks = IndexedSeq(9.001D, 10.001D),
+ inputBytes = IndexedSeq(11.001D, 12.001D),
+ inputRecords = IndexedSeq(13.001D, 14.001D),
+ outputBytes = IndexedSeq(15.001D, 16.001D),
+ outputRecords = IndexedSeq(17.001D, 18.001D),
+ shuffleRead = IndexedSeq(19.001D, 20.001D),
+ shuffleReadRecords = IndexedSeq(21.001D, 22.001D),
+ shuffleWrite = IndexedSeq(23.001D, 24.001D),
+ shuffleWriteRecords = IndexedSeq(25.001D, 24.001D),
+ memoryBytesSpilled = IndexedSeq(27.001D, 28.001D),
+ diskBytesSpilled = IndexedSeq(29.001D, 30.001D),
+ peakMemoryMetrics = executorPeakMetricsDistributions
+ )
+ val info = new StageData(
+ status = StageStatus.COMPLETE,
+ stageId = 1,
+ attemptId = 2,
+ numTasks = 3,
+ numActiveTasks = 4,
+ numCompleteTasks = 5,
+ numFailedTasks = 6,
+ numKilledTasks = 7,
+ numCompletedIndices = 8,
+ submissionTime = Some(new Date(123456L)),
+ firstTaskLaunchedTime = Some(new Date(234567L)),
+ completionTime = Some(new Date(654321L)),
+ failureReason = Some("failure reason"),
+ executorDeserializeTime = 9L,
+ executorDeserializeCpuTime = 10L,
+ executorRunTime = 11L,
+ executorCpuTime = 12L,
+ resultSize = 13L,
+ jvmGcTime = 14L,
+ resultSerializationTime = 15L,
+ memoryBytesSpilled = 16L,
+ diskBytesSpilled = 17L,
+ peakExecutionMemory = 18L,
+ inputBytes = 19L,
+ inputRecords = 20L,
+ outputBytes = 21L,
+ outputRecords = 22L,
+ shuffleRemoteBlocksFetched = 23L,
+ shuffleLocalBlocksFetched = 24L,
+ shuffleFetchWaitTime = 25L,
+ shuffleRemoteBytesRead = 26L,
+ shuffleRemoteBytesReadToDisk = 27L,
+ shuffleLocalBytesRead = 28L,
+ shuffleReadBytes = 29L,
+ shuffleReadRecords = 30L,
+ shuffleWriteBytes = 31L,
+ shuffleWriteTime = 32L,
+ shuffleWriteRecords = 33L,
+ name = "name",
+ description = Some("test description"),
+ details = "test details",
+ schedulingPool = "test scheduling pool",
+ rddIds = Seq(1, 2, 3, 4, 5, 6),
+ accumulatorUpdates = accumulatorUpdates,
+ tasks = tasks,
+ executorSummary = executorSummary,
+ speculationSummary = Some(speculationStageSummary),
+ killedTasksSummary = Map("task_1" -> 1),
+ resourceProfileId = 34,
+ peakExecutorMetrics = peakMemoryMetrics,
+ taskMetricsDistributions = Some(taskMetricDistributions),
+ executorMetricsDistributions = Some(executorMetricsDistributions)
+ )
+ val input = new StageDataWrapper(
+ info = info,
+ jobIds = Set(1, 2, 3, 4),
+ locality = Map(
+ "PROCESS_LOCAL" -> 1L,
+ "NODE_LOCAL" -> 2L
+ )
+ )
+
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes, classOf[StageDataWrapper])
+
+ assert(result.jobIds == input.jobIds)
+ assert(result.locality == input.locality)
+
+ assert(result.info.status == input.info.status)
+ assert(result.info.stageId == input.info.stageId)
+ assert(result.info.attemptId == input.info.attemptId)
+ assert(result.info.numTasks == input.info.numTasks)
+ assert(result.info.numActiveTasks == input.info.numActiveTasks)
+ assert(result.info.numCompleteTasks == input.info.numCompleteTasks)
+ assert(result.info.numFailedTasks == input.info.numFailedTasks)
+ assert(result.info.numKilledTasks == input.info.numKilledTasks)
+ assert(result.info.numCompletedIndices == input.info.numCompletedIndices)
+
+ assert(result.info.submissionTime == input.info.submissionTime)
+ assert(result.info.firstTaskLaunchedTime ==
input.info.firstTaskLaunchedTime)
+ assert(result.info.completionTime == input.info.completionTime)
+ assert(result.info.failureReason == input.info.failureReason)
+
+ assert(result.info.executorDeserializeTime ==
input.info.executorDeserializeTime)
+ assert(result.info.executorDeserializeCpuTime ==
input.info.executorDeserializeCpuTime)
+ assert(result.info.executorRunTime == input.info.executorRunTime)
+ assert(result.info.executorCpuTime == input.info.executorCpuTime)
+ assert(result.info.resultSize == input.info.resultSize)
+ assert(result.info.jvmGcTime == input.info.jvmGcTime)
+ assert(result.info.resultSerializationTime ==
input.info.resultSerializationTime)
+ assert(result.info.memoryBytesSpilled == input.info.memoryBytesSpilled)
+ assert(result.info.diskBytesSpilled == input.info.diskBytesSpilled)
+ assert(result.info.peakExecutionMemory == input.info.peakExecutionMemory)
+ assert(result.info.inputBytes == input.info.inputBytes)
+ assert(result.info.inputRecords == input.info.inputRecords)
+ assert(result.info.outputBytes == input.info.outputBytes)
+ assert(result.info.outputRecords == input.info.outputRecords)
+ assert(result.info.shuffleRemoteBlocksFetched ==
input.info.shuffleRemoteBlocksFetched)
+ assert(result.info.shuffleLocalBlocksFetched ==
input.info.shuffleLocalBlocksFetched)
+ assert(result.info.shuffleFetchWaitTime == input.info.shuffleFetchWaitTime)
+ assert(result.info.shuffleRemoteBytesRead ==
input.info.shuffleRemoteBytesRead)
+ assert(result.info.shuffleRemoteBytesReadToDisk ==
input.info.shuffleRemoteBytesReadToDisk)
+ assert(result.info.shuffleLocalBytesRead ==
input.info.shuffleLocalBytesRead)
+ assert(result.info.shuffleReadBytes == input.info.shuffleReadBytes)
+ assert(result.info.shuffleReadRecords == input.info.shuffleReadRecords)
+ assert(result.info.shuffleWriteBytes == input.info.shuffleWriteBytes)
+ assert(result.info.shuffleWriteTime == input.info.shuffleWriteTime)
+ assert(result.info.shuffleWriteRecords == input.info.shuffleWriteRecords)
+
+ assert(result.info.name == input.info.name)
+ assert(result.info.description == input.info.description)
+ assert(result.info.details == input.info.details)
+ assert(result.info.schedulingPool == input.info.schedulingPool)
+
+ assert(result.info.rddIds == input.info.rddIds)
+ assert(result.info.accumulatorUpdates, input.info.accumulatorUpdates)
+
+ assert(result.info.tasks.isDefined == input.info.tasks.isDefined)
+ if (result.info.tasks.isDefined && input.info.tasks.isDefined) {
+ assertIdTask(result.info.tasks.get, input.info.tasks.get)
+ }
+
+ assert(result.info.executorSummary.isDefined ==
input.info.executorSummary.isDefined)
+ if (result.info.executorSummary.isDefined &&
input.info.executorSummary.isDefined) {
+ assert(result.info.executorSummary.get, input.info.executorSummary.get)
+ }
+
+ assert(result.info.speculationSummary.isDefined ==
input.info.speculationSummary.isDefined)
+ if (result.info.speculationSummary.isDefined &&
input.info.speculationSummary.isDefined) {
+ assert(result.info.speculationSummary.get,
input.info.speculationSummary.get)
+ }
+ assert(result.info.killedTasksSummary == input.info.killedTasksSummary)
+ assert(result.info.resourceProfileId == input.info.resourceProfileId)
+ assert(result.info.peakExecutorMetrics.isDefined ==
input.info.peakExecutorMetrics.isDefined)
+ if (result.info.peakExecutorMetrics.isDefined &&
input.info.peakExecutorMetrics.isDefined) {
+ assert(result.info.peakExecutorMetrics.get,
input.info.peakExecutorMetrics.get)
+ }
+ assert(result.info.taskMetricsDistributions.isDefined ==
+ input.info.taskMetricsDistributions.isDefined)
+ if (result.info.taskMetricsDistributions.isDefined &&
+ input.info.taskMetricsDistributions.isDefined) {
+ assert(result.info.taskMetricsDistributions.get,
input.info.taskMetricsDistributions.get)
+ }
+ assert(result.info.executorMetricsDistributions.isDefined ==
+ input.info.executorMetricsDistributions.isDefined)
+ if (result.info.executorMetricsDistributions.isDefined &&
+ input.info.executorMetricsDistributions.isDefined) {
+ assert(result.info.executorMetricsDistributions.get,
+ input.info.executorMetricsDistributions.get)
+ }
+ }
+
+ private def assert(result: TaskMetrics, input: TaskMetrics): Unit = {
Review Comment:
nit: rename all the `assert` methods as `checkAnwser(result, expected)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]